You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/07/30 23:51:36 UTC

[02/14] accumulo git commit: ACCUMULO-3920 Convert more tests from mock

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
new file mode 100644
index 0000000..fb702a2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.master;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.master.thrift.MasterState;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.state.MergeStats;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.master.state.Assignment;
+import org.apache.accumulo.server.master.state.CurrentState;
+import org.apache.accumulo.server.master.state.MergeInfo;
+import org.apache.accumulo.server.master.state.MergeState;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.net.HostAndPort;
+
+public class MergeStateIT extends ConfigurableMacBase {
+
+  private static class MockCurrentState implements CurrentState {
+
+    TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456);
+    MergeInfo mergeInfo;
+
+    MockCurrentState(MergeInfo info) {
+      this.mergeInfo = info;
+    }
+
+    @Override
+    public Set<String> onlineTables() {
+      return Collections.singleton("t");
+    }
+
+    @Override
+    public Set<TServerInstance> onlineTabletServers() {
+      return Collections.singleton(someTServer);
+    }
+
+    @Override
+    public Collection<MergeInfo> merges() {
+      return Collections.singleton(mergeInfo);
+    }
+
+    @Override
+    public Collection<KeyExtent> migrations() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public MasterState getMasterState() {
+      return MasterState.NORMAL;
+    }
+
+    @Override
+    public Set<TServerInstance> shutdownServers() {
+      return Collections.emptySet();
+    }
+  }
+
+  private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
+    BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    bw.addMutation(m);
+    bw.close();
+  }
+
+  @Test
+  public void test() throws Exception {
+    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+    Connector connector = getConnector();
+    EasyMock.expect(context.getConnector()).andReturn(connector).anyTimes();
+    EasyMock.replay(context);
+    connector.securityOperations().grantTablePermission(connector.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+    BatchWriter bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+    // Create a fake METADATA table with these splits
+    String splits[] = {"a", "e", "j", "o", "t", "z"};
+    // create metadata for a table "t" with the splits above
+    Text tableId = new Text("t");
+    Text pr = null;
+    for (String s : splits) {
+      Text split = new Text(s);
+      Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
+      prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+      ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes()));
+      bw.addMutation(prevRow);
+      pr = split;
+    }
+    // Add the default tablet
+    Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
+    defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
+    bw.addMutation(defaultTablet);
+    bw.close();
+
+    // Read out the TabletLocationStates
+    MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
+
+    // Verify the tablet state: hosted, and count
+    MetaDataStateStore metaDataStateStore = new MetaDataStateStore(context, state);
+    int count = 0;
+    for (TabletLocationState tss : metaDataStateStore) {
+      if (tss != null)
+        count++;
+    }
+    Assert.assertEquals(0, count); // the normal case is to skip tablets in a good state
+
+    // Create the hole
+    // Split the tablet at one end of the range
+    Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
+    TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+    TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o")));
+    update(connector, m);
+
+    // do the state check
+    MergeStats stats = scan(state, metaDataStateStore);
+    MergeState newState = stats.nextMergeState(connector, state);
+    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+
+    // unassign the tablets
+    BatchDeleter deleter = connector.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000, new BatchWriterConfig());
+    deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
+    deleter.setRanges(Collections.singletonList(new Range()));
+    deleter.delete();
+
+    // now we should be ready to merge but, we have inconsistent metadata
+    stats = scan(state, metaDataStateStore);
+    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+    // finish the split
+    KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
+    m = tablet.getPrevRowUpdateMutation();
+    TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
+    update(connector, m);
+    metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
+
+    // onos... there's a new tablet online
+    stats = scan(state, metaDataStateStore);
+    Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
+
+    // chop it
+    m = tablet.getPrevRowUpdateMutation();
+    ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes()));
+    update(connector, m);
+
+    stats = scan(state, metaDataStateStore);
+    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
+
+    // take it offline
+    m = tablet.getPrevRowUpdateMutation();
+    Collection<Collection<String>> walogs = Collections.emptyList();
+    metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
+
+    // now we can split
+    stats = scan(state, metaDataStateStore);
+    Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
+
+  }
+
+  private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
+    MergeStats stats = new MergeStats(state.mergeInfo);
+    stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
+    for (TabletLocationState tss : metaDataStateStore) {
+      stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false);
+    }
+    return stats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java b/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java
new file mode 100644
index 0000000..5519013
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/FinishedWorkUpdaterIT.java
@@ -0,0 +1,173 @@
+package org.apache.accumulo.test.replication;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.FinishedWorkUpdater;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class FinishedWorkUpdaterIT extends ConfigurableMacBase {
+
+  private Connector conn;
+  private FinishedWorkUpdater updater;
+
+  @Before
+  public void configureUpdater() throws Exception {
+    conn = getConnector();
+    updater = new FinishedWorkUpdater(conn);
+  }
+
+  @Test
+  public void offlineReplicationTableFailsGracefully() {
+    updater.run();
+  }
+
+  @Test
+  public void recordsWithProgressUpdateBothTables() throws Exception {
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    ReplicationTable.setOnline(conn);
+
+    String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
+    ReplicationTarget target = new ReplicationTarget("peer", "table1", "1");
+
+    // Create a single work record for a file to some peer
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    Mutation m = new Mutation(file);
+    WorkSection.add(m, target.toText(), ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    bw.close();
+
+    updater.run();
+
+    Scanner s = ReplicationTable.getScanner(conn);
+    s.setRange(Range.exact(file));
+    StatusSection.limit(s);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+    Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
+    Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target.getSourceTableId());
+
+    // We should only rely on the correct begin attribute being returned
+    Status actual = Status.parseFrom(entry.getValue().get());
+    Assert.assertEquals(stat.getBegin(), actual.getBegin());
+  }
+
+  @Test
+  public void chooseMinimumBeginOffset() throws Exception {
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    ReplicationTable.setOnline(conn);
+
+    String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    // @formatter:off
+    Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
+        stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
+        stat3 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build();
+    ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
+        target2 = new ReplicationTarget("peer2", "table2", "1"),
+        target3 = new ReplicationTarget("peer3", "table3", "1");
+    // @formatter:on
+
+    // Create a single work record for a file to some peer
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    Mutation m = new Mutation(file);
+    WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
+    WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
+    WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
+    bw.addMutation(m);
+    bw.close();
+
+    updater.run();
+
+    Scanner s = ReplicationTable.getScanner(conn);
+    s.setRange(Range.exact(file));
+    StatusSection.limit(s);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+    Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
+    Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
+
+    // We should only rely on the correct begin attribute being returned
+    Status actual = Status.parseFrom(entry.getValue().get());
+    Assert.assertEquals(1, actual.getBegin());
+  }
+
+  @Test
+  public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    ReplicationTable.setOnline(conn);
+
+    String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    // @formatter:off
+    Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
+        stat2 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
+        stat3 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
+    ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
+        target2 = new ReplicationTarget("peer2", "table2", "1"),
+        target3 = new ReplicationTarget("peer3", "table3", "1");
+    // @formatter:on
+
+    // Create a single work record for a file to some peer
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    Mutation m = new Mutation(file);
+    WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
+    WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
+    WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
+    bw.addMutation(m);
+    bw.close();
+
+    updater.run();
+
+    Scanner s = ReplicationTable.getScanner(conn);
+    s.setRange(Range.exact(file));
+    StatusSection.limit(s);
+    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+    Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
+    Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
+
+    // We should only rely on the correct begin attribute being returned
+    Status actual = Status.parseFrom(entry.getValue().get());
+    Assert.assertEquals(1, actual.getBegin());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java b/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java
new file mode 100644
index 0000000..df1f64f
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/RemoveCompleteReplicationRecordsIT.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.RemoveCompleteReplicationRecords;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class RemoveCompleteReplicationRecordsIT extends ConfigurableMacBase {
+
+  private MockRemoveCompleteReplicationRecords rcrr;
+  private Connector conn;
+
+  private static class MockRemoveCompleteReplicationRecords extends RemoveCompleteReplicationRecords {
+
+    public MockRemoveCompleteReplicationRecords(Connector conn) {
+      super(conn);
+    }
+
+    @Override
+    public long removeCompleteRecords(Connector conn, BatchScanner bs, BatchWriter bw) {
+      return super.removeCompleteRecords(conn, bs, bw);
+    }
+
+  }
+
+  @Before
+  public void initialize() throws Exception {
+    conn = getConnector();
+    rcrr = new MockRemoveCompleteReplicationRecords(conn);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    ReplicationTable.setOnline(conn);
+  }
+
+  @Test
+  public void notYetReplicationRecordsIgnored() throws Exception {
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    int numRecords = 3;
+    for (int i = 0; i < numRecords; i++) {
+      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+      Mutation m = new Mutation(file);
+      StatusSection.add(m, new Text(Integer.toString(i)), StatusUtil.openWithUnknownLengthValue());
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+    bs.setRanges(Collections.singleton(new Range()));
+    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+    bs.addScanIterator(cfg);
+    bw = EasyMock.createMock(BatchWriter.class);
+
+    EasyMock.replay(bw);
+
+    rcrr.removeCompleteRecords(conn, bs, bw);
+    bs.close();
+
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+  }
+
+  @Test
+  public void partiallyReplicatedRecordsIgnored() throws Exception {
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    int numRecords = 3;
+    Status.Builder builder = Status.newBuilder();
+    builder.setClosed(false);
+    builder.setEnd(10000);
+    builder.setInfiniteEnd(false);
+    for (int i = 0; i < numRecords; i++) {
+      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+      Mutation m = new Mutation(file);
+      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+    bs.setRanges(Collections.singleton(new Range()));
+    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+    bs.addScanIterator(cfg);
+    bw = EasyMock.createMock(BatchWriter.class);
+
+    EasyMock.replay(bw);
+
+    // We don't remove any records, so we can just pass in a fake BW for both
+    rcrr.removeCompleteRecords(conn, bs, bw);
+    bs.close();
+
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+  }
+
+  @Test
+  public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception {
+    BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
+    int numRecords = 3;
+
+    Status.Builder builder = Status.newBuilder();
+    builder.setClosed(false);
+    builder.setEnd(10000);
+    builder.setInfiniteEnd(false);
+
+    // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
+    for (int i = 0; i < numRecords; i++) {
+      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+      Mutation m = new Mutation(file);
+      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
+      replBw.addMutation(m);
+    }
+
+    // Add two records that we can delete
+    String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    Mutation m = new Mutation(fileToRemove);
+    StatusSection.add(m, new Text("5"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
+    replBw.addMutation(m);
+
+    numRecords++;
+
+    fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    m = new Mutation(fileToRemove);
+    StatusSection.add(m, new Text("6"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
+    replBw.addMutation(m);
+
+    numRecords++;
+
+    replBw.flush();
+
+    // Make sure that we have the expected number of records in both tables
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+    // We should not remove any records because they're missing closed status
+    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+    bs.setRanges(Collections.singleton(new Range()));
+    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+    bs.addScanIterator(cfg);
+
+    try {
+      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
+    } finally {
+      bs.close();
+      replBw.close();
+    }
+  }
+
+  @Test
+  public void replicatedClosedRowsAreRemoved() throws Exception {
+    BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
+    int numRecords = 3;
+
+    Status.Builder builder = Status.newBuilder();
+    builder.setClosed(false);
+    builder.setEnd(10000);
+    builder.setInfiniteEnd(false);
+
+    long time = System.currentTimeMillis();
+    // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
+    for (int i = 0; i < numRecords; i++) {
+      builder.setCreatedTime(time++);
+      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+      Mutation m = new Mutation(file);
+      Value v = ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build());
+      StatusSection.add(m, new Text(Integer.toString(i)), v);
+      replBw.addMutation(m);
+      m = OrderSection.createMutation(file, time);
+      OrderSection.add(m, new Text(Integer.toString(i)), v);
+      replBw.addMutation(m);
+    }
+
+    Set<String> filesToRemove = new HashSet<>();
+    // We created two mutations for each file
+    numRecords *= 2;
+    int finalNumRecords = numRecords;
+
+    // Add two records that we can delete
+    String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    filesToRemove.add(fileToRemove);
+    Mutation m = new Mutation(fileToRemove);
+    ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
+    Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
+    StatusSection.add(m, new Text("5"), value);
+    WorkSection.add(m, target.toText(), value);
+    replBw.addMutation(m);
+
+    m = OrderSection.createMutation(fileToRemove, time);
+    OrderSection.add(m, new Text("5"), value);
+    replBw.addMutation(m);
+    time++;
+
+    numRecords += 3;
+
+    fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    filesToRemove.add(fileToRemove);
+    m = new Mutation(fileToRemove);
+    value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
+    target = new ReplicationTarget("peer1", "6", "6");
+    StatusSection.add(m, new Text("6"), value);
+    WorkSection.add(m, target.toText(), value);
+    replBw.addMutation(m);
+
+    m = OrderSection.createMutation(fileToRemove, time);
+    OrderSection.add(m, new Text("6"), value);
+    replBw.addMutation(m);
+    time++;
+
+    numRecords += 3;
+
+    replBw.flush();
+
+    // Make sure that we have the expected number of records in both tables
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+    // We should remove the two fully completed records we inserted
+    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+    bs.setRanges(Collections.singleton(new Range()));
+    StatusSection.limit(bs);
+    WorkSection.limit(bs);
+    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+    bs.addScanIterator(cfg);
+
+    try {
+      Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, bs, replBw));
+    } finally {
+      bs.close();
+      replBw.close();
+    }
+
+    int actualRecords = 0;
+    for (Entry<Key,Value> entry : ReplicationTable.getScanner(conn)) {
+      Assert.assertFalse(filesToRemove.contains(entry.getKey().getRow().toString()));
+      actualRecords++;
+    }
+
+    Assert.assertEquals(finalNumRecords, actualRecords);
+  }
+
+  @Test
+  public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception {
+    BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
+    int numRecords = 3;
+
+    Status.Builder builder = Status.newBuilder();
+    builder.setClosed(false);
+    builder.setEnd(10000);
+    builder.setInfiniteEnd(false);
+
+    // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
+    for (int i = 0; i < numRecords; i++) {
+      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+      Mutation m = new Mutation(file);
+      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
+      replBw.addMutation(m);
+    }
+
+    // Add two records that we can delete
+    String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
+    Mutation m = new Mutation(fileToRemove);
+    ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
+    Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).build());
+    StatusSection.add(m, new Text("5"), value);
+    WorkSection.add(m, target.toText(), value);
+    target = new ReplicationTarget("peer2", "5", "5");
+    WorkSection.add(m, target.toText(), value);
+    target = new ReplicationTarget("peer3", "5", "5");
+    WorkSection.add(m, target.toText(), ProtobufUtil.toValue(builder.setClosed(false).build()));
+    replBw.addMutation(m);
+
+    numRecords += 4;
+
+    replBw.flush();
+
+    // Make sure that we have the expected number of records in both tables
+    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
+
+    // We should remove the two fully completed records we inserted
+    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
+    bs.setRanges(Collections.singleton(new Range()));
+    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
+    bs.addScanIterator(cfg);
+
+    try {
+      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
+    } finally {
+      bs.close();
+      replBw.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
new file mode 100644
index 0000000..fa026d1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/ReplicationOperationsImplIT.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.ReplicationOperationsImpl;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.master.MasterClientServiceHandler;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.apache.thrift.TException;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationOperationsImplIT extends ConfigurableMacBase {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplIT.class);
+
+  private Instance inst;
+  private Connector conn;
+
+  @Before
+  public void configureInstance() throws Exception {
+    conn = getConnector();
+    inst = conn.getInstance();
+    ReplicationTable.setOnline(conn);
+    conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+  }
+
+  /**
+   * Spoof out the Master so we can call the implementation without starting a full instance.
+   */
+  private ReplicationOperationsImpl getReplicationOperations() throws Exception {
+    Master master = EasyMock.createMock(Master.class);
+    EasyMock.expect(master.getConnector()).andReturn(conn).anyTimes();
+    EasyMock.expect(master.getInstance()).andReturn(inst).anyTimes();
+    EasyMock.replay(master);
+
+    final MasterClientServiceHandler mcsh = new MasterClientServiceHandler(master) {
+      @Override
+      protected String getTableId(Instance inst, String tableName) throws ThriftTableOperationException {
+        try {
+          return conn.tableOperations().tableIdMap().get(tableName);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    ClientContext context = new ClientContext(inst, new Credentials("root", new PasswordToken(ROOT_PASSWORD)), getClientConfig());
+    return new ReplicationOperationsImpl(context) {
+      @Override
+      protected boolean getMasterDrain(final TInfo tinfo, final TCredentials rpcCreds, final String tableName, final Set<String> wals)
+          throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+        try {
+          return mcsh.drainReplicationTable(tinfo, rpcCreds, tableName, wals);
+        } catch (TException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  @Test
+  public void waitsUntilEntriesAreReplicated() throws Exception {
+    conn.tableOperations().create("foo");
+    Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    StatusSection.add(m, tableId, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    bw.close();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+
+    bw.addMutation(m);
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+    m.put(ReplicationSection.COLF, tableId, ProtobufUtil.toValue(stat));
+
+    bw.close();
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    final ReplicationOperationsImpl roi = getReplicationOperations();
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // With the records, we shouldn't be drained
+    Assert.assertFalse(done.get());
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.putDelete(ReplicationSection.COLF, tableId);
+    bw.addMutation(m);
+    bw.flush();
+
+    Assert.assertFalse(done.get());
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+    m.putDelete(ReplicationSection.COLF, tableId);
+    bw.addMutation(m);
+    bw.flush();
+    bw.close();
+
+    // Removing metadata entries doesn't change anything
+    Assert.assertFalse(done.get());
+
+    // Remove the replication entries too
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.putDelete(StatusSection.NAME, tableId);
+    bw.addMutation(m);
+    bw.flush();
+
+    Assert.assertFalse(done.get());
+
+    m = new Mutation(file2);
+    m.putDelete(StatusSection.NAME, tableId);
+    bw.addMutation(m);
+    bw.flush();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // After both metadata and replication
+    Assert.assertTrue("Drain never finished", done.get());
+    Assert.assertFalse("Saw unexpectetd exception", exception.get());
+  }
+
+  @Test
+  public void unrelatedReplicationRecordsDontBlockDrain() throws Exception {
+    conn.tableOperations().create("foo");
+    conn.tableOperations().create("bar");
+
+    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+    Text tableId2 = new Text(conn.tableOperations().tableIdMap().get("bar"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    StatusSection.add(m, tableId2, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    bw.close();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+
+    bw.addMutation(m);
+
+    m = new Mutation(ReplicationSection.getRowPrefix() + file2);
+    m.put(ReplicationSection.COLF, tableId2, ProtobufUtil.toValue(stat));
+
+    bw.close();
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+
+    final ReplicationOperationsImpl roi = getReplicationOperations();
+
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // With the records, we shouldn't be drained
+    Assert.assertFalse(done.get());
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.putDelete(ReplicationSection.COLF, tableId1);
+    bw.addMutation(m);
+    bw.flush();
+
+    // Removing metadata entries doesn't change anything
+    Assert.assertFalse(done.get());
+
+    // Remove the replication entries too
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.putDelete(StatusSection.NAME, tableId1);
+    bw.addMutation(m);
+    bw.flush();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // After both metadata and replication
+    Assert.assertTrue("Drain never completed", done.get());
+    Assert.assertFalse("Saw unexpected exception", exception.get());
+  }
+
+  @Test
+  public void inprogressReplicationRecordsBlockExecution() throws Exception {
+    conn.tableOperations().create("foo");
+
+    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    bw.close();
+
+    LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId1), null, null), System.currentTimeMillis(), "tserver", file1);
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    m = new Mutation(logEntry.getRow());
+    m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    final ReplicationOperationsImpl roi = getReplicationOperations();
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // With the records, we shouldn't be drained
+    Assert.assertFalse(done.get());
+
+    Status newStatus = Status.newBuilder().setBegin(1000).setEnd(2000).setInfiniteEnd(false).setClosed(true).build();
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(newStatus));
+    bw.addMutation(m);
+    bw.flush();
+
+    // Removing metadata entries doesn't change anything
+    Assert.assertFalse(done.get());
+
+    // Remove the replication entries too
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus));
+    bw.addMutation(m);
+    bw.flush();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // New records, but not fully replicated ones don't cause it to complete
+    Assert.assertFalse("Drain somehow finished", done.get());
+    Assert.assertFalse("Saw unexpected exception", exception.get());
+  }
+
+  @Test
+  public void laterCreatedLogsDontBlockExecution() throws Exception {
+    conn.tableOperations().create("foo");
+
+    Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo"));
+
+    String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build();
+
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    Mutation m = new Mutation(file1);
+    StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    bw.close();
+
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+
+    bw.close();
+
+    log.info("Reading metadata first time");
+    for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      log.info("{}", e.getKey());
+    }
+
+    final AtomicBoolean done = new AtomicBoolean(false);
+    final AtomicBoolean exception = new AtomicBoolean(false);
+    final ReplicationOperationsImpl roi = getReplicationOperations();
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          roi.drain("foo");
+        } catch (Exception e) {
+          log.error("Got error", e);
+          exception.set(true);
+        }
+        done.set(true);
+      }
+    });
+
+    t.start();
+
+    // We need to wait long enough for the table to read once
+    Thread.sleep(2000);
+
+    // Write another file, but also delete the old files
+    bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    m = new Mutation(ReplicationSection.getRowPrefix() + "/accumulo/wals/tserver+port/" + UUID.randomUUID());
+    m.put(ReplicationSection.COLF, tableId1, ProtobufUtil.toValue(stat));
+    bw.addMutation(m);
+    m = new Mutation(ReplicationSection.getRowPrefix() + file1);
+    m.putDelete(ReplicationSection.COLF, tableId1);
+    bw.addMutation(m);
+    bw.close();
+
+    log.info("Reading metadata second time");
+    for (Entry<Key,Value> e : conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+      log.info("{}", e.getKey());
+    }
+
+    bw = ReplicationTable.getBatchWriter(conn);
+    m = new Mutation(file1);
+    m.putDelete(StatusSection.NAME, tableId1);
+    bw.addMutation(m);
+    bw.close();
+
+    try {
+      t.join(5000);
+    } catch (InterruptedException e) {
+      Assert.fail("ReplicationOperations.drain did not complete");
+    }
+
+    // We should pass immediately because we aren't waiting on both files to be deleted (just the one that we did)
+    Assert.assertTrue("Drain didn't finish", done.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
new file mode 100644
index 0000000..5668a67
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SequentialWorkAssignerIT extends ConfigurableMacBase {
+
+  private Connector conn;
+  private MockSequentialWorkAssigner assigner;
+
+  private static class MockSequentialWorkAssigner extends SequentialWorkAssigner {
+
+    public MockSequentialWorkAssigner(Connector conn) {
+      super(null, conn);
+    }
+
+    @Override
+    public void setConnector(Connector conn) {
+      super.setConnector(conn);
+    }
+
+    @Override
+    public void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+      super.setQueuedWork(queuedWork);
+    }
+
+    @Override
+    public void setWorkQueue(DistributedWorkQueue workQueue) {
+      super.setWorkQueue(workQueue);
+    }
+
+    @Override
+    public void setMaxQueueSize(int maxQueueSize) {
+      super.setMaxQueueSize(maxQueueSize);
+    }
+
+    @Override
+    public void createWork() {
+      super.createWork();
+    }
+
+    @Override
+    public void setZooCache(ZooCache zooCache) {
+      super.setZooCache(zooCache);
+    }
+
+    @Override
+    public void cleanupFinishedWork() {
+      super.cleanupFinishedWork();
+    }
+
+  }
+
+  @Before
+  public void init() throws Exception {
+    conn = getConnector();
+    assigner = new MockSequentialWorkAssigner(conn);
+    // grant ourselves write to the replication table
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    ReplicationTable.setOnline(conn);
+  }
+
+  @Test
+  public void createWorkForFilesInCorrectOrder() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget = target.toText();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), file1);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
+  }
+
+  @Test
+  public void workAcrossTablesHappensConcurrently() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget1 = target1.toText();
+
+    ReplicationTarget target2 = new ReplicationTarget("cluster1", "table2", "2");
+    Text serializedTarget2 = target2.toText();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
+    expectLastCall().once();
+
+    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(2, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+
+    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
+  }
+
+  @Test
+  public void workAcrossPeersHappensConcurrently() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget1 = target1.toText();
+
+    ReplicationTarget target2 = new ReplicationTarget("cluster2", "table1", "1");
+    Text serializedTarget2 = target2.toText();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
+    expectLastCall().once();
+
+    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(2, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
+
+    Map<String,String> cluster2Work = queuedWork.get("cluster2");
+    Assert.assertEquals(1, cluster2Work.size());
+    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
+  }
+
+  @Test
+  public void reprocessingOfCompletedWorkRemovesWork() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget = target.toText();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+
+    // Treat filename1 as we have already submitted it for replication
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,String> queuedWorkForCluster = new HashMap<>();
+    queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
+    queuedWork.put("cluster1", queuedWorkForCluster);
+
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), file2);
+    expectLastCall().once();
+
+    // file2 is queued because we remove file1 because it's fully replicated
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
new file mode 100644
index 0000000..cb34ed2
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/StatusMakerIT.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.StatusMaker;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+public class StatusMakerIT extends ConfigurableMacBase {
+
+  private Connector conn;
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    ReplicationTable.setOnline(conn);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+  }
+
+  @Test
+  public void statusRecordsCreated() throws Exception {
+    String sourceTable = testName.getMethodName();
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Integer> fileToTableId = new HashMap<>();
+
+    int index = 1;
+    long timeCreated = 0;
+    Map<String,Long> fileToTimeCreated = new HashMap<>();
+    for (String file : files) {
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
+      fileToTimeCreated.put(file, timeCreated);
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+      index++;
+      timeCreated++;
+    }
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Text file = new Text(), tableId = new Text();
+    for (Entry<Key,Value> entry : s) {
+      StatusSection.getFile(entry.getKey(), file);
+      StatusSection.getTableId(entry.getKey(), tableId);
+
+      Assert.assertTrue("Found unexpected file: " + file, files.contains(file.toString()));
+      Assert.assertEquals(fileToTableId.get(file.toString()), new Integer(tableId.toString()));
+      timeCreated = fileToTimeCreated.get(file.toString());
+      Assert.assertNotNull(timeCreated);
+      Assert.assertEquals(StatusUtil.fileCreated(timeCreated), Status.parseFrom(entry.getValue().get()));
+    }
+  }
+
+  @Test
+  public void openMessagesAreNotDeleted() throws Exception {
+    String sourceTable = testName.getMethodName();
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Integer> fileToTableId = new HashMap<>();
+
+    int index = 1;
+    long timeCreated = 0;
+    for (String file : files) {
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+      index++;
+      timeCreated++;
+    }
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(files.size(), Iterables.size(s));
+  }
+
+  @Test
+  public void closedMessagesAreDeleted() throws Exception {
+    String sourceTable = testName.getMethodName();
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Integer> fileToTableId = new HashMap<>();
+
+    Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).setCreatedTime(System.currentTimeMillis()).build();
+
+    int index = 1;
+    for (String file : files) {
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(stat));
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+      index++;
+    }
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    for (Entry<Key,Value> e : s) {
+      System.out.println(e.getKey().toStringNoTruncate() + " " + e.getValue());
+    }
+    s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+  }
+
+  @Test
+  public void closedMessagesCreateOrderRecords() throws Exception {
+    String sourceTable = testName.getMethodName();
+    conn.tableOperations().create(sourceTable);
+    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
+
+    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
+    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
+    List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
+        walPrefix + UUID.randomUUID());
+    Map<String,Integer> fileToTableId = new HashMap<>();
+
+    Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
+
+    int index = 1;
+    long time = System.currentTimeMillis();
+    for (String file : files) {
+      statBuilder.setCreatedTime(time++);
+      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
+      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
+      bw.addMutation(m);
+      fileToTableId.put(file, index);
+      index++;
+    }
+
+    bw.close();
+
+    StatusMaker statusMaker = new StatusMaker(conn);
+    statusMaker.setSourceTableName(sourceTable);
+
+    statusMaker.run();
+
+    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
+    s.setRange(ReplicationSection.getRange());
+    s.fetchColumnFamily(ReplicationSection.COLF);
+    Assert.assertEquals(0, Iterables.size(s));
+
+    s = ReplicationTable.getScanner(conn);
+    OrderSection.limit(s);
+    Iterator<Entry<Key,Value>> iter = s.iterator();
+    Assert.assertTrue("Found no order records in replication table", iter.hasNext());
+
+    Iterator<String> expectedFiles = files.iterator();
+    Text buff = new Text();
+    while (expectedFiles.hasNext() && iter.hasNext()) {
+      String file = expectedFiles.next();
+      Entry<Key,Value> entry = iter.next();
+
+      Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
+      OrderSection.getTableId(entry.getKey(), buff);
+      Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
+    }
+
+    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
+    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java
new file mode 100644
index 0000000..048fa94
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerIT.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.replication.UnorderedWorkAssigner;
+import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnorderedWorkAssignerIT extends ConfigurableMacBase {
+
+  private Connector conn;
+  private MockUnorderedWorkAssigner assigner;
+
+  private static class MockUnorderedWorkAssigner extends UnorderedWorkAssigner {
+    public MockUnorderedWorkAssigner(Connector conn) {
+      super(null, conn);
+    }
+
+    @Override
+    protected void setQueuedWork(Set<String> queuedWork) {
+      super.setQueuedWork(queuedWork);
+    }
+
+    @Override
+    protected void setWorkQueue(DistributedWorkQueue workQueue) {
+      super.setWorkQueue(workQueue);
+    }
+
+    @Override
+    protected boolean queueWork(Path path, ReplicationTarget target) {
+      return super.queueWork(path, target);
+    }
+
+    @Override
+    protected void initializeQueuedWork() {
+      super.initializeQueuedWork();
+    }
+
+    @Override
+    protected Set<String> getQueuedWork() {
+      return super.getQueuedWork();
+    }
+
+    @Override
+    protected void setConnector(Connector conn) {
+      super.setConnector(conn);
+    }
+
+    @Override
+    protected void setMaxQueueSize(int maxQueueSize) {
+      super.setMaxQueueSize(maxQueueSize);
+    }
+
+    @Override
+    protected void createWork() {
+      super.createWork();
+    }
+
+    @Override
+    protected void setZooCache(ZooCache zooCache) {
+      super.setZooCache(zooCache);
+    }
+
+    @Override
+    protected void cleanupFinishedWork() {
+      super.cleanupFinishedWork();
+    }
+  }
+
+  @Before
+  public void init() throws Exception {
+    conn = getConnector();
+    assigner = new MockUnorderedWorkAssigner(conn);
+    ReplicationTable.setOnline(conn);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.WRITE);
+    conn.securityOperations().grantTablePermission(conn.whoami(), ReplicationTable.NAME, TablePermission.READ);
+  }
+
+  @Test
+  public void createWorkForFilesNeedingIt() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
+    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
+    String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getRemoteIdentifier()
+        + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
+        + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR
+        + target2.getSourceTableId();
+
+    Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l);
+    Status status1 = builder.build();
+    builder.setCreatedTime(10l);
+    Status status2 = builder.build();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(status1));
+    bw.addMutation(m);
+    m = OrderSection.createMutation(file1, status1.getCreatedTime());
+    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(status1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(status2));
+    bw.addMutation(m);
+    m = OrderSection.createMutation(file2, status2.getCreatedTime());
+    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(status2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    HashSet<String> queuedWork = new HashSet<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the order they were created
+    String key = filename1 + "|" + keyTarget1;
+    workQueue.addWork(key, file1);
+    expectLastCall().once();
+
+    key = filename2 + "|" + keyTarget2;
+    workQueue.addWork(key, file2);
+    expectLastCall().once();
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+  }
+
+  @Test
+  public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
+    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    HashSet<String> queuedWork = new HashSet<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+  }
+
+  @Test
+  public void workNotReAdded() throws Exception {
+    Set<String> queuedWork = new HashSet<>();
+
+    assigner.setQueuedWork(queuedWork);
+
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier()
+        + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
+
+    queuedWork.add("wal1|" + serializedTarget.toString());
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String file1 = "/accumulo/wal/tserver+port/wal1";
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+  }
+}