You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/01/26 23:10:22 UTC

svn commit: r1236411 - in /incubator/accumulo/branches/1.4: docs/ src/server/src/main/java/org/apache/accumulo/server/iterators/ src/server/src/main/java/org/apache/accumulo/server/master/ src/server/src/main/java/org/apache/accumulo/server/tabletserve...

Author: kturner
Date: Thu Jan 26 22:10:21 2012
New Revision: 1236411

URL: http://svn.apache.org/viewvc?rev=1236411&view=rev
Log:
ACCUMULO-334 Made splits copy bulk load flags.  Added metadata iterator to delete inactive bulk load flags.  Made bulk RW test check all markers are present only once.

Added:
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
Modified:
    incubator/accumulo/branches/1.4/docs/config.html
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java

Modified: incubator/accumulo/branches/1.4/docs/config.html
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/docs/config.html?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/docs/config.html (original)
+++ incubator/accumulo/branches/1.4/docs/config.html Thu Jan 26 22:10:21 2012
@@ -521,62 +521,69 @@ $HADOOP_HOME/lib/[^.].*.jar,
     <td>determines if logs are archived in hdfs</td>
    </tr>
    <tr >
+    <td>logger.archive.replication</td>
+    <td><b><a href='#COUNT'>count</a></b></td>
+    <td>yes</td>
+    <td><pre>0</pre></td>
+    <td>determines the replication factor for walogs archived in hdfs, set to zero to use default</td>
+   </tr>
+   <tr class='highlight'>
     <td>logger.copy.threadpool.size</td>
     <td><b><a href='#COUNT'>count</a></b></td>
     <td>yes</td>
     <td><pre>2</pre></td>
     <td>size of the thread pool used to copy files from the local log area to HDFS</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>logger.dir.walog</td>
     <td><b><a href='#PATH'>path</a></b></td>
     <td>yes</td>
     <td><pre>walogs</pre></td>
     <td>The directory used to store write-ahead logs on the local filesystem. It is possible to specify a comma-separated list of directories.</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>logger.monitor.fs</td>
     <td><b><a href='#BOOLEAN'>boolean</a></b></td>
     <td>yes</td>
     <td><pre>true</pre></td>
     <td>When enabled the logger will monitor file systems and kill itself when one switches from rw to ro.  This is usually and indication that Linux has detected a bad disk.</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>logger.port.client</td>
     <td><b><a href='#PORT'>port</a></b></td>
     <td>yes but requires restart</td>
     <td><pre>11224</pre></td>
     <td>The port used for write-ahead logger services</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>logger.port.search</td>
     <td><b><a href='#BOOLEAN'>boolean</a></b></td>
     <td>yes</td>
     <td><pre>false</pre></td>
     <td>if the port above is in use, search higher ports until one is available</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>logger.recovery.file.replication</td>
     <td><b><a href='#COUNT'>count</a></b></td>
     <td>yes</td>
     <td><pre>1</pre></td>
     <td>When a logger puts a WALOG into HDFS, it will use this as the replication factor.</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>logger.server.threadcheck.time</td>
     <td><b><a href='#TIMEDURATION'>duration</a></b></td>
     <td>yes</td>
     <td><pre>1s</pre></td>
     <td>The time between adjustments of the server thread pool.</td>
    </tr>
-   <tr class='highlight'>
+   <tr >
     <td>logger.server.threads.minimum</td>
     <td><b><a href='#COUNT'>count</a></b></td>
     <td>yes</td>
     <td><pre>2</pre></td>
     <td>The miniumum number of threads to use to handle incoming requests.</td>
    </tr>
-   <tr >
+   <tr class='highlight'>
     <td>logger.sort.buffer.size</td>
     <td><b><a href='#MEMORY'>memory</a></b></td>
     <td>yes</td>

Added: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java?rev=1236411&view=auto
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java (added)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java Thu Jan 26 22:10:21 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.iterators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+
+/**
+ * A special iterator for the metadata table that removes inactive bulk load flags
+ * 
+ */
+public class MetadataBulkLoadFilter extends Filter {
+  
+  enum Status {
+    ACTIVE, INACTIVE
+  }
+  
+  Map<Long,Status> bulkTxStatusCache;
+  ZooArbitrator arbitrator;
+  
+  @Override
+  public boolean accept(Key k, Value v) {
+    if (!k.isDeleted() && k.compareColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY) == 0) {
+      long txid = Long.valueOf(v.toString());
+      
+      Status status = bulkTxStatusCache.get(txid);
+      if (status == null) {
+        try {
+          if (arbitrator.transactionAlive(Constants.BULK_ARBITRATOR_TYPE, txid)) {
+            status = Status.ACTIVE;
+          } else {
+            status = Status.INACTIVE;
+          }
+        } catch (Exception e) {
+          // TODO log
+          status = Status.ACTIVE;
+        }
+        
+        bulkTxStatusCache.put(txid, status);
+      }
+      
+      return status == Status.ACTIVE;
+    }
+
+    return true;
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    
+    if (env.getIteratorScope() == IteratorScope.scan) {
+      throw new IOException("This iterator not intended for use at scan time");
+    }
+
+    bulkTxStatusCache = new HashMap<Long,MetadataBulkLoadFilter.Status>();
+    arbitrator = new ZooArbitrator();
+  }
+}

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Thu Jan 26 22:10:21 2012
@@ -93,6 +93,7 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fate.Fate;
 import org.apache.accumulo.server.fate.TStore.TStatus;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete;
 import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
@@ -328,6 +329,9 @@ public class Master implements LiveTServ
 
         IZooReaderWriter zoo = ZooReaderWriter.getInstance();
         
+        TablePropUtil.setTableProperty(Constants.METADATA_TABLE_ID, Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20,"
+            + MetadataBulkLoadFilter.class.getName());
+        
         zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, new byte[0], NodeExistsPolicy.SKIP);
         zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZHDFS_RESERVATIONS, new byte[0], NodeExistsPolicy.SKIP);
         zoo.putPersistentData(ZooUtil.getRoot(instance) + Constants.ZNEXT_FILE, new byte[] {'0'}, NodeExistsPolicy.SKIP);

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Thu Jan 26 22:10:21 2012
@@ -3434,9 +3434,14 @@ public class Tablet {
       
       String time = tabletTime.getMetadataValue();
       
+      // it is possible that some of the bulk loading flags will be deleted after being read below because the bulk load
+      // finishes.... therefore split could propogate load flags for a finished bulk load... there is a special iterator
+      // on the !METADATA table to clean up this type of garbage
+      Map<String,Long> bulkLoadedFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
+
       MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(), tabletServer.getLock());
-      MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, SecurityConstants.getSystemCredentials(), time,
-          lastFlushID, lastCompactID, tabletServer.getLock());
+      MetadataTable.addNewTablet(low, lowDirectory, tabletServer.getTabletSession(), lowDatafileSizes, bulkLoadedFiles,
+          SecurityConstants.getSystemCredentials(), time, lastFlushID, lastCompactID, tabletServer.getLock());
       MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(), tabletServer.getLock());
       
       log.log(TLevel.TABLET_HIST, extent + " split " + low + " " + high);

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Thu Jan 26 22:10:21 2012
@@ -164,9 +164,11 @@ public class SplitRecoveryTest extends F
     m.put(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, assignment.server.asColumnQualifier(), assignment.server.asMutationValue());
     writer.update(m);
     
-    if (steps >= 1)
-      MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID + "0", -1l,
-          -1l, zl);
+    if (steps >= 1) {
+      Map<String,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
+      MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, bulkFiles, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID
+          + "0", -1l, -1l, zl);
+    }
     if (steps >= 2)
       MetadataTable.finishSplit(high, highDatafileSizes, highDatafilesToRemove, SecurityConstants.getSystemCredentials(), zl);
     

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/BulkPlusOne.java Thu Jan 26 22:10:21 2012
@@ -54,8 +54,10 @@ public class BulkPlusOne extends BulkTes
     }
   }
   public static final Text MARKER_CF = new Text("marker");
-  private static final AtomicLong counter = new AtomicLong();
+  static final AtomicLong counter = new AtomicLong();
   
+  private static final Value ONE = new Value("1".getBytes());
+
   static void bulkLoadLots(Logger log, State state, Value value) throws Exception {
     final Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
     final Path fail = new Path(dir.toString() + "_fail");
@@ -74,8 +76,8 @@ public class BulkPlusOne extends BulkTes
     for (Integer row : startRows)
       printRows.add(String.format(FMT, row));
     
-    String markerColumnFamily = Long.toString(counter.incrementAndGet());
-    log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnFamily);
+    String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
+    log.debug("preparing bulk files with start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
     
     List<Integer> rows = new ArrayList<Integer>(startRows);
     rows.add(LOTS);
@@ -91,7 +93,7 @@ public class BulkPlusOne extends BulkTes
         for (Column col : COLNAMES) {
           f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value);
         }
-        f.append(new Key(row, MARKER_CF, new Text(markerColumnFamily)), value);
+        f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
       }
       f.close();
     }
@@ -101,7 +103,7 @@ public class BulkPlusOne extends BulkTes
     FileStatus[] failures = fs.listStatus(fail);
     if (failures != null && failures.length > 0)
       throw new Exception("Failures " + Arrays.asList(failures) + " found importing files from " + dir);
-    log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnFamily);
+    log.debug("Finished bulk import, start rows " + printRows + " last row " + String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
   }
   
   @Override

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Setup.java Thu Jan 26 22:10:21 2012
@@ -57,7 +57,7 @@ public class Setup extends Test {
         tableOps.create(getTableName());
         IteratorSetting is = new IteratorSetting(10, org.apache.accumulo.core.iterators.user.SummingCombiner.class);
         SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING);
-        SummingCombiner.setColumns(is, BulkPlusOne.COLNAMES);
+        SummingCombiner.setCombineAllColumns(is, true);
         tableOps.attachIterator(getTableName(), is);
       }
     } catch (TableExistsException ex) {
@@ -65,6 +65,7 @@ public class Setup extends Test {
     }
     state.set("rand", rand);
     state.set("fs", FileSystem.get(CachedConfiguration.getInstance()));
+    BulkPlusOne.counter.set(0l);
     
     BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
     ThreadFactory factory = new ThreadFactory() {

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/randomwalk/bulk/Verify.java Thu Jan 26 22:10:21 2012
@@ -17,6 +17,7 @@
 package org.apache.accumulo.server.test.randomwalk.bulk;
 
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -58,6 +60,37 @@ public class Verify extends Test {
         throw new Exception("Bad key at " + entry);
       }
     }
+    
+    scanner.clearColumns();
+    scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF);
+    RowIterator rowIter = new RowIterator(scanner);
+    
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> row = rowIter.next();
+      long prev = 0;
+      Text rowText = null;
+      while (row.hasNext()) {
+        Entry<Key,Value> entry = row.next();
+        
+        if (rowText == null)
+          rowText = entry.getKey().getRow();
+
+        long curr = Long.valueOf(entry.getKey().getColumnQualifier().toString());
+
+        if (curr - 1 != prev)
+          throw new Exception("Bad marker count " + entry.getKey() + " " + entry.getValue() + " " + prev);
+        
+        if (!entry.getValue().toString().equals("1"))
+          throw new Exception("Bad marker value " + entry.getKey() + " " + entry.getValue());
+        
+        prev = curr;
+      }
+      
+      if (BulkPlusOne.counter.get() != prev) {
+        throw new Exception("Row " + rowText + " does not have all markers " + BulkPlusOne.counter.get() + " " + prev);
+      }
+    }
+
     log.info("Test successful on table " + Setup.getTableName());
     state.getConnector().tableOperations().delete(Setup.getTableName());
   }

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Thu Jan 26 22:10:21 2012
@@ -46,6 +46,7 @@ import org.apache.accumulo.server.Server
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.ZKAuthenticator;
@@ -92,6 +93,7 @@ public class Initialize {
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers.opt.maxVersions", "1");
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers", "10," + VersioningIterator.class.getName());
     initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers.opt.maxVersions", "1");
+    initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName());
     initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
     initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet",
         String.format("%s,%s", Constants.METADATA_TABLET_COLUMN_FAMILY.toString(), Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.toString()));

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1236411&r1=1236410&r2=1236411&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Thu Jan 26 22:10:21 2012
@@ -417,8 +417,8 @@ public class MetadataTable extends org.a
     return sizes;
   }
   
-  public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<String,DataFileValue> datafileSizes, AuthInfo credentials,
-      String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
+  public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<String,DataFileValue> datafileSizes,
+      Map<String,Long> bulkLoadedFiles, AuthInfo credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
     Mutation m = extent.getPrevRowUpdateMutation();
     
     ColumnFQ.put(m, Constants.METADATA_DIRECTORY_COLUMN, new Value(path.getBytes()));
@@ -437,6 +437,11 @@ public class MetadataTable extends org.a
       m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
     }
     
+    for (Entry<String,Long> entry : bulkLoadedFiles.entrySet()) {
+      byte[] tidBytes = Long.toString(entry.getValue()).getBytes();
+      m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(tidBytes));
+    }
+
     update(credentials, zooLock, m);
   }
   
@@ -594,7 +599,8 @@ public class MetadataTable extends org.a
     
     if (!scanner2.iterator().hasNext()) {
       log.debug("Prev tablet " + prevRowKey + " does not exist, need to create it " + metadataPrevEndRow + " " + prevPrevEndRow + " " + splitRatio);
-      MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, credentials, time, initFlushID, initCompactID, lock);
+      Map<String,Long> bulkFiles = getBulkFilesLoaded(credentials, metadataEntry);
+      MetadataTable.addNewTablet(low, lowDirectory, tserver, lowDatafileSizes, bulkFiles, credentials, time, initFlushID, initCompactID, lock);
     } else {
       log.debug("Prev tablet " + prevRowKey + " exist, do not need to add it");
     }
@@ -1245,6 +1251,26 @@ public class MetadataTable extends org.a
     }
   }
   
+  public static Map<String,Long> getBulkFilesLoaded(AuthInfo credentials, KeyExtent extent) {
+    return getBulkFilesLoaded(credentials, extent.getMetadataEntry());
+  }
+  
+  public static Map<String,Long> getBulkFilesLoaded(AuthInfo credentials, Text metadataRow) {
+    
+    Map<String,Long> ret = new HashMap<String,Long>();
+    
+    Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
+    scanner.setRange(new Range(metadataRow));
+    scanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+    for (Entry<Key,Value> entry : scanner) {
+      String file = entry.getKey().getColumnQualifier().toString();
+      Long tid = Long.parseLong(entry.getValue().toString());
+      
+      ret.put(file, tid);
+    }
+    return ret;
+  }
+
   public static void addBulkLoadInProgressFlag(String path) {
     
     Mutation m = new Mutation(Constants.METADATA_BLIP_FLAG_PREFIX + path);