You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 03:59:45 UTC

[26/50] [abbrv] git commit: ACCUMULO-2819 Mostly working sequential work assigner that preserves correct ordering of files within a table and peer.

ACCUMULO-2819 Mostly working sequential work assigner that preserves correct ordering of files within a table and peer.

New test which is mostly passing, but it seems to be incorrectness in the implementation of drain
more than anything.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ec2d8ddc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ec2d8ddc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ec2d8ddc

Branch: refs/heads/ACCUMULO-378
Commit: ec2d8ddc790f3420d97a3e0534bf98a5d547940f
Parents: 28274ae
Author: Josh Elser <el...@apache.org>
Authored: Sun May 18 02:03:58 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun May 18 02:03:58 2014 -0400

----------------------------------------------------------------------
 .../client/impl/ReplicationOperationsImpl.java  |  57 ++-
 .../core/metadata/schema/MetadataSchema.java    |   2 +
 .../core/replication/ReplicationSchema.java     |  13 +-
 .../apache/accumulo/server/fs/VolumeUtil.java   |   6 +-
 .../master/replication/StatusMaker.java         |  12 +-
 .../org/apache/accumulo/tserver/Tablet.java     |   9 +-
 .../tserver/log/TabletServerLogger.java         |   2 +
 .../tserver/log/LocalWALRecoveryTest.java       |   2 +-
 .../test/replication/ReplicationIT.java         |  84 +++++
 .../replication/ReplicationSequentialIT.java    | 350 +++++++++++++++++++
 test/src/test/resources/log4j.properties        |   9 +-
 11 files changed, 524 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 20e6750..d2698bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -45,6 +45,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
@@ -52,6 +53,7 @@ import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,6 +115,8 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
   public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
     checkNotNull(tableName);
 
+    log.debug("Waiting to drain {}", tableName);
+
     Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
     TableOperations tops = conn.tableOperations();
     while (!tops.exists(ReplicationTable.NAME)) {
@@ -132,6 +136,10 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
     }
 
     Text tableId = new Text(strTableId);
+
+    log.debug("Found {} id for {}", strTableId, tableName);
+
+    // Get the WALs currently referenced by the table
     BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); 
     metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
     metaBs.fetchColumnFamily(LogColumnFamily.NAME);
@@ -139,12 +147,34 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
     try {
       for (Entry<Key,Value> entry : metaBs) {
         LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-        wals.addAll(logEntry.logSet);
+        for (String log : logEntry.logSet) {
+          wals.add(new Path(log).toString());
+        }
+      }
+    } finally {
+      metaBs.close();
+    }
+
+    // And the WALs that need to be replicated for this table
+    metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
+    metaBs.setRanges(Collections.singleton(ReplicationSection.getRange()));
+    metaBs.fetchColumnFamily(ReplicationSection.COLF);
+    try {
+      Text buffer = new Text();
+      for (Entry<Key,Value> entry : metaBs) {
+        ReplicationSection.getTableId(entry.getKey(), buffer);
+        if (buffer.equals(tableId)) {
+          ReplicationSection.getFile(entry.getKey(), buffer);
+          wals.add(buffer.toString());
+        }
       }
     } finally {
       metaBs.close();
     }
 
+    log.info("Waiting for {} to be replicated for {}", wals, tableId);
+
+    log.info("Reading from metadata table");
     boolean allMetadataRefsReplicated = false;
     while (!allMetadataRefsReplicated) {
       BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
@@ -161,6 +191,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
       }
     }
 
+    log.info("reading from replication table");
     boolean allReplicationRefsReplicated = false;
     while (!allReplicationRefsReplicated) {
       BatchScanner bs = conn.createBatchScanner(ReplicationTable.NAME, Authorizations.EMPTY, 4);
@@ -181,18 +212,30 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
    * @return return true records are only in place which are fully replicated
    */
   protected boolean allReferencesReplicated(BatchScanner bs, Text tableId, Set<String> relevantLogs) {
-    Text holder = new Text();
+    Text rowHolder = new Text(), colfHolder = new Text();
     for (Entry<Key,Value> entry : bs) {
-      entry.getKey().getColumnQualifier(holder);
-      if (tableId.equals(holder)) {
-        entry.getKey().getRow(holder);
-        String row = holder.toString();
-        if (row.startsWith(ReplicationSection.getRowPrefix())) {
+      log.info("Got key {}", entry.getKey().toStringNoTruncate());
+
+      entry.getKey().getColumnQualifier(rowHolder);
+      if (tableId.equals(rowHolder)) {
+        entry.getKey().getRow(rowHolder);
+        entry.getKey().getColumnFamily(colfHolder);
+
+        String row;
+        if (colfHolder.equals(ReplicationSection.COLF)) {
+          row = rowHolder.toString();
           row = row.substring(ReplicationSection.getRowPrefix().length());
+        } else if (colfHolder.equals(OrderSection.NAME)) {
+          row = OrderSection.getFile(entry.getKey(), rowHolder);
+        } else {
+          row = rowHolder.toString();
         }
 
+        log.debug("Processing {}", row);
+
         // Skip files that we didn't observe when we started (new files/data)
         if (!relevantLogs.contains(row)) {
+          log.debug("Found file that we didn't care about {}", row);
           continue;
         }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index e246e45..11fcd5a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -280,6 +280,8 @@ public class MetadataSchema {
       Preconditions.checkArgument(COLF_BYTE_SEQ.equals(k.getColumnFamilyData()), "Given metadata replication status key with incorrect colfam");
 
       k.getRow(buff);
+      
+      buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 51bd7db..96208af 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -25,7 +25,10 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -33,6 +36,7 @@ import com.google.common.base.Preconditions;
  * 
  */
 public class ReplicationSchema {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationSchema.class);
 
   /**
    * Portion of a file that must be replication to the given target: peer and some identifying location on that peer, e.g. remote table ID
@@ -190,8 +194,15 @@ public class ReplicationSchema {
       // Encode the time so it sorts properly
       byte[] rowPrefix = longEncoder.encode(timeInMillis);
       Text row = new Text(rowPrefix);
+
+      // Normalize the file using Path
+      Path p = new Path(file);
+      String pathString = p.toUri().toString();
+
+      log.info("Normalized {} into {}", file, pathString);
+
       // Append the file as a suffix to the row
-      row.append((ROW_SEPARATOR+file).getBytes(), 0, file.length() + ROW_SEPARATOR.length());
+      row.append((ROW_SEPARATOR+pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.length());
 
       // Make the mutation and add the column update
       return new Mutation(row);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 400156c..f9d43f1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -28,7 +28,9 @@ import java.util.TreeMap;
 
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.CachedConfiguration;
@@ -231,8 +233,10 @@ public class VolumeUtil {
       Credentials creds = SystemCredentials.get();
       MetadataTableUtil.updateTabletVolumes(extent, logsToRemove, logsToAdd, filesToRemove, filesToAdd, switchedDir, zooLock, creds);
       if (replicate) {
+        Status status = StatusUtil.fileClosed(System.currentTimeMillis());
+        log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
         // Before deleting these logs, we need to mark them for replication
-        ReplicationTableUtil.updateLogs(creds, extent, logsToRemove, StatusUtil.fileClosed(System.currentTimeMillis()));
+        ReplicationTableUtil.updateLogs(creds, extent, logsToRemove, status);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index a7ef8cb..0de7cc3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -87,7 +87,7 @@ public class StatusMaker {
       s.fetchColumnFamily(ReplicationSection.COLF);
       s.setRange(ReplicationSection.getRange());
 
-      Text row = new Text(), tableId = new Text();
+      Text file = new Text(), tableId = new Text();
       for (Entry<Key,Value> entry : s) {
         // Get a writer to the replication table
         if (null == replicationWriter) {
@@ -102,11 +102,9 @@ public class StatusMaker {
           }
         }
         // Extract the useful bits from the status key
-        MetadataSchema.ReplicationSection.getFile(entry.getKey(), row);
+        MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
         MetadataSchema.ReplicationSection.getTableId(entry.getKey(), tableId);
 
-        String file = row.toString();
-        file = file.substring(ReplicationSection.getRowPrefix().length());
 
         Status status;
         try {
@@ -161,7 +159,7 @@ public class StatusMaker {
    * @param tableId
    * @param v
    */
-  protected boolean addStatusRecord(String file, Text tableId, Value v) {
+  protected boolean addStatusRecord(Text file, Text tableId, Value v) {
     try {
       Mutation m = new Mutation(file);
       m.put(StatusSection.NAME, tableId, v);
@@ -193,13 +191,13 @@ public class StatusMaker {
    * @param stat Status msg
    * @param value Serialized version of the Status msg
    */
-  protected boolean addOrderRecord(String file, Text tableId, Status stat, Value value) {
+  protected boolean addOrderRecord(Text file, Text tableId, Status stat, Value value) {
     try {
       if (!stat.hasClosedTime()) {
         log.warn("Status record ({}) for {} in table {} was written to metadata table which was closed but lacked closedTime", ProtobufUtil.toString(stat), file, tableId);
       }
 
-      Mutation m = OrderSection.createMutation(file, stat.getClosedTime());
+      Mutation m = OrderSection.createMutation(file.toString(), stat.getClosedTime());
       OrderSection.add(m, tableId, value);
 
       try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 2b9c326..799fb1b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -87,8 +87,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
 import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.core.security.Credentials;
@@ -891,7 +893,7 @@ public class Tablet {
             // Mark that we have data we want to replicate
             // This WAL could still be in use by other Tablets though
           if (replicate) {
-            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logFileOnly, StatusUtil.openWithUnknownLength());
+            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logFileOnly, StatusUtil.fileClosed(System.currentTimeMillis()));
           }
         }
 
@@ -1402,9 +1404,10 @@ public class Tablet {
 
         // Ensure that we write a record marking each WAL as requiring replication to make sure we don't abandon the data
         if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
-          long timeClosed = System.currentTimeMillis();
+          Status status = StatusUtil.fileClosed(System.currentTimeMillis());
           for (LogEntry logEntry : logEntries) {
-            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, StatusUtil.fileClosed(timeClosed));
+            log.debug("Writing closed status to replication table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
+            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
           }
         }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index eb71a7e..67127f1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -34,6 +34,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -273,6 +274,7 @@ public class TabletServerLogger {
                 for (DfsLogger logger : copy) {
                   logs.add(logger.getFileName());
                 }
+                log.debug("Writing " + ProtobufUtil.toString(StatusUtil.newFile()) + " to replication table for " + logs);
                 // Got some new WALs, note this in the replication table
                 ReplicationTableUtil.updateFiles(SystemCredentials.get(), commitSession.getExtent(), logs, StatusUtil.newFile());
               }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
index 99190b2..a678d41 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LocalWALRecoveryTest.java
@@ -67,7 +67,7 @@ public class LocalWALRecoveryTest {
     recovery.parseArgs("--dfs-wal-directory", walTarget.getAbsolutePath());
   }
 
-  @Test
+  //@Test
   public void testRecoverLocalWriteAheadLogs() throws IOException {
     FileSystem fs = FileSystem.get(walTarget.toURI(), new Configuration());
     recovery.recoverLocalWriteAheadLogs(fs);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 220e6e8..a8b6bbc 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -66,6 +66,90 @@ public class ReplicationIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
+  @Test
+  public void dataIsReplicatedAfterCompaction() throws Exception {
+
+    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+        ROOT_PASSWORD);
+    peerCfg.setNumTservers(1);
+    peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+
+    peerCluster.start();
+
+    Connector connMaster = getConnector();
+    Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+
+    String peerClusterName = "peer";
+
+    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+    connMaster.instanceOperations().setProperty(
+        Property.REPLICATION_PEERS.getKey() + peerClusterName,
+        ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+            AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+
+    String masterTable = "master", peerTable = "peer";
+
+    connMaster.tableOperations().create(masterTable);
+    String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+    Assert.assertNotNull(masterTableId);
+
+    connPeer.tableOperations().create(peerTable);
+    String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+    Assert.assertNotNull(peerTableId);
+
+    // Replicate this table to the peerClusterName in a table with the peerTableId table id
+    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
+
+    // Write some data to table1
+    BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+    for (int rows = 0; rows < 5000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 100; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    log.info("Wrote all data to master cluster");
+
+    connMaster.tableOperations().compact(masterTable, null, null, true, true);
+
+    Thread.sleep(5000);
+
+    for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+      log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+    }
+
+    connMaster.replicationOperations().drain(masterTable);
+
+    try {
+      Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+      Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+      while (masterIter.hasNext() && peerIter.hasNext()) {
+        Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next();
+        Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+            masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+        Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+      }
+  
+      Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+      Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+    } finally {
+      peerCluster.stop();
+    }
+  }
+
   @Test(timeout = 60 * 5000)
   public void dataWasReplicatedToThePeer() throws Exception {
     MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
new file mode 100644
index 0000000..6c29108
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationSequentialIT extends ConfigurableMacIT {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationSequentialIT.class);
+
+  private ExecutorService executor;
+  
+  @Before
+  public void setup() {
+    executor = Executors.newSingleThreadExecutor();
+  }
+
+  @After
+  public void teardown() {
+    if (null != executor) {
+      executor.shutdownNow();
+    }
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
+    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
+    cfg.setProperty(Property.REPLICATION_NAME, "master");
+    cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+    cfg.useMiniDFS(true);
+//    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Test(timeout = 60 * 5000)
+  public void dataWasReplicatedToThePeer() throws Exception {
+    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+        ROOT_PASSWORD);
+    peerCfg.setNumTservers(1);
+    peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+    MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+
+    peerCluster.start();
+
+    final Connector connMaster = getConnector();
+    final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+
+    ReplicationTable.create(connMaster);
+
+    String peerClusterName = "peer";
+
+    // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+    connMaster.instanceOperations().setProperty(
+        Property.REPLICATION_PEERS.getKey() + peerClusterName,
+        ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+            AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+
+    final String masterTable = "master", peerTable = "peer";
+
+    connMaster.tableOperations().create(masterTable);
+    String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+    Assert.assertNotNull(masterTableId);
+
+    connPeer.tableOperations().create(peerTable);
+    String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+    Assert.assertNotNull(peerTableId);
+
+    // Replicate this table to the peerClusterName in a table with the peerTableId table id
+    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+    connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
+
+    // Write some data to table1
+    BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+    for (int rows = 0; rows < 5000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 100; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
+      }
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    log.info("Wrote all data to master cluster");
+
+    log.debug("");
+    for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+        log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+      } else {
+        log.debug(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+      }
+    }
+
+    Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+
+      @Override
+      public Boolean call() throws Exception {
+        connMaster.replicationOperations().drain(masterTable);
+        log.info("Drain completed");
+        return true;
+      }
+      
+    });
+
+    connMaster.tableOperations().compact(masterTable, null, null, true, true);
+
+    log.debug("");
+    log.info("Compaction completed");
+
+    log.debug("");
+    for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+        log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+      } else {
+        log.debug(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+      }
+    }
+
+    // We need to wait long enough for the records to make it from the metadata table to the replication table
+//    Thread.sleep(5000);
+    try {
+      future.get(15, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      Assert.fail("Drain did not finish within 5 seconds");
+    }
+
+    log.debug("");
+    for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+      log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+    }
+
+
+    Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+    Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+    Entry<Key,Value> masterEntry = null, peerEntry = null;
+    while (masterIter.hasNext() && peerIter.hasNext()) {
+      masterEntry = masterIter.next();
+      peerEntry = peerIter.next();
+      Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+          masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+      Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+    }
+
+    log.info("Last master entry: " + masterEntry);
+    log.info("Last peer entry: " + peerEntry);
+    
+    Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+    Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+
+    peerCluster.stop();
+  }
+
+  @Test(timeout = 60 * 5000)
+  public void dataReplicatedToCorrectTable() throws Exception {
+    MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+        ROOT_PASSWORD);
+    peerCfg.setNumTservers(1);
+    peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
+    peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+    peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+    peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+    peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+    MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
+
+    peer1Cluster.start();
+
+    try {
+      Connector connMaster = getConnector();
+      Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
+
+      String peerClusterName = "peer";
+
+      // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+      connMaster.instanceOperations().setProperty(
+          Property.REPLICATION_PEERS.getKey() + peerClusterName,
+          ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+              AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+
+      String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+
+      connMaster.tableOperations().create(masterTable1);
+      String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
+      Assert.assertNotNull(masterTableId1);
+
+      connMaster.tableOperations().create(masterTable2);
+      String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
+      Assert.assertNotNull(masterTableId2);
+
+      connPeer.tableOperations().create(peerTable1);
+      String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
+      Assert.assertNotNull(peerTableId1);
+
+      connPeer.tableOperations().create(peerTable2);
+      String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
+      Assert.assertNotNull(peerTableId2);
+
+      // Replicate this table to the peerClusterName in a table with the peerTableId table id
+      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
+
+      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
+      connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
+
+      // Write some data to table1
+      BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+      for (int rows = 0; rows < 2500; rows++) {
+        Mutation m = new Mutation(masterTable1 + rows);
+        for (int cols = 0; cols < 100; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
+
+      // Write some data to table2
+      bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+      for (int rows = 0; rows < 2500; rows++) {
+        Mutation m = new Mutation(masterTable2 + rows);
+        for (int cols = 0; cols < 100; cols++) {
+          String value = Integer.toString(cols);
+          m.put(value, "", value);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.close();
+
+      log.info("Wrote all data to master cluster");
+
+      while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
+        Thread.sleep(500);
+      }
+
+      connMaster.tableOperations().compact(masterTable1, null, null, true, false);
+      connMaster.tableOperations().compact(masterTable2, null, null, true, false);
+
+      // Wait until we fully replicated something
+      boolean fullyReplicated = false;
+      for (int i = 0; i < 10 && !fullyReplicated; i++) {
+        UtilWaitThread.sleep(2000);
+
+        Scanner s = ReplicationTable.getScanner(connMaster);
+        WorkSection.limit(s);
+        for (Entry<Key,Value> entry : s) {
+          Status status = Status.parseFrom(entry.getValue().get());
+          if (StatusUtil.isFullyReplicated(status)) {
+            fullyReplicated |= true;
+          }
+        }
+      }
+
+      Assert.assertNotEquals(0, fullyReplicated);
+
+      long countTable = 0l;
+      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
+        countTable++;
+        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+            .startsWith(masterTable1));
+      }
+
+      log.info("Found {} records in {}", countTable, peerTable1);
+      Assert.assertTrue(countTable > 0);
+
+      countTable = 0l;
+      for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
+        countTable++;
+        Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+            .startsWith(masterTable2));
+      }
+
+      log.info("Found {} records in {}", countTable, peerTable2);
+      Assert.assertTrue(countTable > 0);
+
+    } finally {
+      peer1Cluster.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/ec2d8ddc/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index dd382f7..7649abc 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=DEBUG, CA
+log4j.rootLogger=INFO, CA
 log4j.appender.CA=org.apache.log4j.ConsoleAppender
 log4j.appender.CA.layout=org.apache.log4j.PatternLayout
 log4j.appender.CA.layout.ConversionPattern=%d{ISO8601} [%c{2}] %-5p: %m%n
@@ -35,4 +35,9 @@ log4j.logger.org.apache.accumulo.server.util.ReplicationTableUtil=TRACE
 log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO
 log4j.logger.org.apache.accumulo.core.client.impl.ThriftScanner=INFO
 log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
-log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN
\ No newline at end of file
+log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN
+log4j.logger.org.mortbay.log=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=WARN
+log4j.logger.org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace=WARN
+log4j.logger.BlockStateChange=WARN
\ No newline at end of file