You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2015/07/30 23:51:39 UTC

[05/14] accumulo git commit: ACCUMULO-3920 Convert more tests from mock

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
deleted file mode 100644
index a8fe771..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchDeleter;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.master.thrift.MasterState;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.master.state.MergeStats;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.master.state.Assignment;
-import org.apache.accumulo.server.master.state.CurrentState;
-import org.apache.accumulo.server.master.state.MergeInfo;
-import org.apache.accumulo.server.master.state.MergeState;
-import org.apache.accumulo.server.master.state.MetaDataStateStore;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.state.TabletLocationState;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.net.HostAndPort;
-
-public class TestMergeState {
-
-  class MockCurrentState implements CurrentState {
-
-    TServerInstance someTServer = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 1234), 0x123456);
-    MergeInfo mergeInfo;
-
-    MockCurrentState(MergeInfo info) {
-      this.mergeInfo = info;
-    }
-
-    @Override
-    public Set<String> onlineTables() {
-      return Collections.singleton("t");
-    }
-
-    @Override
-    public Set<TServerInstance> onlineTabletServers() {
-      return Collections.singleton(someTServer);
-    }
-
-    @Override
-    public Collection<MergeInfo> merges() {
-      return Collections.singleton(mergeInfo);
-    }
-
-    @Override
-    public Collection<KeyExtent> migrations() {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public MasterState getMasterState() {
-      return MasterState.NORMAL;
-    }
-
-    @Override
-    public Set<TServerInstance> shutdownServers() {
-      return Collections.emptySet();
-    }
-  }
-
-  private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
-    BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    bw.addMutation(m);
-    bw.close();
-  }
-
-  @Rule
-  public TestName test = new TestName();
-
-  @Test
-  public void test() throws Exception {
-    Instance instance = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
-    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(instance));
-    Connector connector = context.getConnector();
-    BatchWriter bw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-
-    // Create a fake METADATA table with these splits
-    String splits[] = {"a", "e", "j", "o", "t", "z"};
-    // create metadata for a table "t" with the splits above
-    Text tableId = new Text("t");
-    Text pr = null;
-    for (String s : splits) {
-      Text split = new Text(s);
-      Mutation prevRow = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, split, pr));
-      prevRow.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
-      ChoppedColumnFamily.CHOPPED_COLUMN.put(prevRow, new Value("junk".getBytes()));
-      bw.addMutation(prevRow);
-      pr = split;
-    }
-    // Add the default tablet
-    Mutation defaultTablet = KeyExtent.getPrevRowUpdateMutation(new KeyExtent(tableId, null, pr));
-    defaultTablet.put(TabletsSection.CurrentLocationColumnFamily.NAME, new Text("123456"), new Value("127.0.0.1:1234".getBytes()));
-    bw.addMutation(defaultTablet);
-    bw.close();
-
-    // Read out the TabletLocationStates
-    MockCurrentState state = new MockCurrentState(new MergeInfo(new KeyExtent(tableId, new Text("p"), new Text("e")), MergeInfo.Operation.MERGE));
-
-    // Verify the tablet state: hosted, and count
-    MetaDataStateStore metaDataStateStore = new MetaDataStateStore(context, state);
-    int count = 0;
-    for (TabletLocationState tss : metaDataStateStore) {
-      if (tss != null)
-        count++;
-    }
-    Assert.assertEquals(0, count); // the normal case is to skip tablets in a good state
-
-    // Create the hole
-    // Split the tablet at one end of the range
-    Mutation m = new KeyExtent(tableId, new Text("t"), new Text("p")).getPrevRowUpdateMutation();
-    TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
-    TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(new Text("o")));
-    update(connector, m);
-
-    // do the state check
-    MergeStats stats = scan(state, metaDataStateStore);
-    MergeState newState = stats.nextMergeState(connector, state);
-    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
-
-    // unassign the tablets
-    BatchDeleter deleter = connector.createBatchDeleter(MetadataTable.NAME, Authorizations.EMPTY, 1000, new BatchWriterConfig());
-    deleter.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
-    deleter.setRanges(Collections.singletonList(new Range()));
-    deleter.delete();
-
-    // now we should be ready to merge but, we have inconsistent metadata
-    stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
-
-    // finish the split
-    KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
-    m = tablet.getPrevRowUpdateMutation();
-    TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5".getBytes()));
-    update(connector, m);
-    metaDataStateStore.setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer)));
-
-    // onos... there's a new tablet online
-    stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
-
-    // chop it
-    m = tablet.getPrevRowUpdateMutation();
-    ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("junk".getBytes()));
-    update(connector, m);
-
-    stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
-
-    // take it offline
-    m = tablet.getPrevRowUpdateMutation();
-    Collection<Collection<String>> walogs = Collections.emptyList();
-    metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null);
-
-    // now we can split
-    stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
-
-  }
-
-  private MergeStats scan(MockCurrentState state, MetaDataStateStore metaDataStateStore) {
-    MergeStats stats = new MergeStats(state.mergeInfo);
-    stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE);
-    for (TabletLocationState tss : metaDataStateStore) {
-      stats.update(tss.extent, tss.getState(state.onlineTabletServers()), tss.chopped, false);
-    }
-    return stats;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
deleted file mode 100644
index 864a79d..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/FinishedWorkUpdaterTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-
-public class FinishedWorkUpdaterTest {
-
-  @Rule
-  public TestName test = new TestName();
-
-  private Connector conn;
-  private FinishedWorkUpdater updater;
-
-  @Before
-  public void setup() throws Exception {
-    Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
-    conn = inst.getConnector("root", new PasswordToken(""));
-    updater = new FinishedWorkUpdater(conn);
-  }
-
-  @Test
-  public void offlineReplicationTableFailsGracefully() {
-    updater.run();
-  }
-
-  @Test
-  public void recordsWithProgressUpdateBothTables() throws Exception {
-    String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    Status stat = Status.newBuilder().setBegin(100).setEnd(200).setClosed(true).setInfiniteEnd(false).build();
-    ReplicationTarget target = new ReplicationTarget("peer", "table1", "1");
-
-    // Create a single work record for a file to some peer
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    Mutation m = new Mutation(file);
-    WorkSection.add(m, target.toText(), ProtobufUtil.toValue(stat));
-    bw.addMutation(m);
-    bw.close();
-
-    updater.run();
-
-    Scanner s = ReplicationTable.getScanner(conn);
-    s.setRange(Range.exact(file));
-    StatusSection.limit(s);
-    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
-
-    Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
-    Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target.getSourceTableId());
-
-    // We should only rely on the correct begin attribute being returned
-    Status actual = Status.parseFrom(entry.getValue().get());
-    Assert.assertEquals(stat.getBegin(), actual.getBegin());
-  }
-
-  @Test
-  public void chooseMinimumBeginOffset() throws Exception {
-    String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    // @formatter:off
-    Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
-        stat2 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(false).build(),
-        stat3 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(false).build();
-    ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
-        target2 = new ReplicationTarget("peer2", "table2", "1"),
-        target3 = new ReplicationTarget("peer3", "table3", "1");
-    // @formatter:on
-
-    // Create a single work record for a file to some peer
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    Mutation m = new Mutation(file);
-    WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
-    WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
-    WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
-    bw.addMutation(m);
-    bw.close();
-
-    updater.run();
-
-    Scanner s = ReplicationTable.getScanner(conn);
-    s.setRange(Range.exact(file));
-    StatusSection.limit(s);
-    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
-
-    Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
-    Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
-
-    // We should only rely on the correct begin attribute being returned
-    Status actual = Status.parseFrom(entry.getValue().get());
-    Assert.assertEquals(1, actual.getBegin());
-  }
-
-  @Test
-  public void chooseMinimumBeginOffsetInfiniteEnd() throws Exception {
-    String file = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
-    // @formatter:off
-    Status stat1 = Status.newBuilder().setBegin(100).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
-        stat2 = Status.newBuilder().setBegin(1).setEnd(1000).setClosed(true).setInfiniteEnd(true).build(),
-        stat3 = Status.newBuilder().setBegin(500).setEnd(1000).setClosed(true).setInfiniteEnd(true).build();
-    ReplicationTarget target1 = new ReplicationTarget("peer1", "table1", "1"),
-        target2 = new ReplicationTarget("peer2", "table2", "1"),
-        target3 = new ReplicationTarget("peer3", "table3", "1");
-    // @formatter:on
-
-    // Create a single work record for a file to some peer
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    Mutation m = new Mutation(file);
-    WorkSection.add(m, target1.toText(), ProtobufUtil.toValue(stat1));
-    WorkSection.add(m, target2.toText(), ProtobufUtil.toValue(stat2));
-    WorkSection.add(m, target3.toText(), ProtobufUtil.toValue(stat3));
-    bw.addMutation(m);
-    bw.close();
-
-    updater.run();
-
-    Scanner s = ReplicationTable.getScanner(conn);
-    s.setRange(Range.exact(file));
-    StatusSection.limit(s);
-    Entry<Key,Value> entry = Iterables.getOnlyElement(s);
-
-    Assert.assertEquals(entry.getKey().getColumnFamily(), StatusSection.NAME);
-    Assert.assertEquals(entry.getKey().getColumnQualifier().toString(), target1.getSourceTableId());
-
-    // We should only rely on the correct begin attribute being returned
-    Status actual = Status.parseFrom(entry.getValue().get());
-    Assert.assertEquals(1, actual.getBegin());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
deleted file mode 100644
index 2555077..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-
-public class RemoveCompleteReplicationRecordsTest {
-
-  private RemoveCompleteReplicationRecords rcrr;
-  private Instance inst;
-  private Connector conn;
-
-  @Rule
-  public TestName test = new TestName();
-
-  @Before
-  public void initialize() throws Exception {
-    inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
-    conn = inst.getConnector("root", new PasswordToken(""));
-    rcrr = new RemoveCompleteReplicationRecords(conn);
-  }
-
-  @Test
-  public void notYetReplicationRecordsIgnored() throws Exception {
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    int numRecords = 3;
-    for (int i = 0; i < numRecords; i++) {
-      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-      Mutation m = new Mutation(file);
-      StatusSection.add(m, new Text(Integer.toString(i)), StatusUtil.openWithUnknownLengthValue());
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
-    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
-    bs.setRanges(Collections.singleton(new Range()));
-    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
-    bs.addScanIterator(cfg);
-    bw = EasyMock.createMock(BatchWriter.class);
-
-    EasyMock.replay(bw);
-
-    rcrr.removeCompleteRecords(conn, bs, bw);
-    bs.close();
-
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-  }
-
-  @Test
-  public void partiallyReplicatedRecordsIgnored() throws Exception {
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    int numRecords = 3;
-    Status.Builder builder = Status.newBuilder();
-    builder.setClosed(false);
-    builder.setEnd(10000);
-    builder.setInfiniteEnd(false);
-    for (int i = 0; i < numRecords; i++) {
-      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-      Mutation m = new Mutation(file);
-      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
-      bw.addMutation(m);
-    }
-
-    bw.close();
-
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
-    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
-    bs.setRanges(Collections.singleton(new Range()));
-    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
-    bs.addScanIterator(cfg);
-    bw = EasyMock.createMock(BatchWriter.class);
-
-    EasyMock.replay(bw);
-
-    // We don't remove any records, so we can just pass in a fake BW for both
-    rcrr.removeCompleteRecords(conn, bs, bw);
-    bs.close();
-
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-  }
-
-  @Test
-  public void replicatedClosedWorkRecordsAreNotRemovedWithoutClosedStatusRecords() throws Exception {
-    BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
-    int numRecords = 3;
-
-    Status.Builder builder = Status.newBuilder();
-    builder.setClosed(false);
-    builder.setEnd(10000);
-    builder.setInfiniteEnd(false);
-
-    // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
-    for (int i = 0; i < numRecords; i++) {
-      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-      Mutation m = new Mutation(file);
-      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
-      replBw.addMutation(m);
-    }
-
-    // Add two records that we can delete
-    String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    Mutation m = new Mutation(fileToRemove);
-    StatusSection.add(m, new Text("5"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
-    replBw.addMutation(m);
-
-    numRecords++;
-
-    fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    m = new Mutation(fileToRemove);
-    StatusSection.add(m, new Text("6"), ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(false).build()));
-    replBw.addMutation(m);
-
-    numRecords++;
-
-    replBw.flush();
-
-    // Make sure that we have the expected number of records in both tables
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
-    // We should not remove any records because they're missing closed status
-    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
-    bs.setRanges(Collections.singleton(new Range()));
-    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
-    bs.addScanIterator(cfg);
-
-    try {
-      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
-    } finally {
-      bs.close();
-      replBw.close();
-    }
-  }
-
-  @Test
-  public void replicatedClosedRowsAreRemoved() throws Exception {
-    BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
-    int numRecords = 3;
-
-    Status.Builder builder = Status.newBuilder();
-    builder.setClosed(false);
-    builder.setEnd(10000);
-    builder.setInfiniteEnd(false);
-
-    long time = System.currentTimeMillis();
-    // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
-    for (int i = 0; i < numRecords; i++) {
-      builder.setCreatedTime(time++);
-      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-      Mutation m = new Mutation(file);
-      Value v = ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build());
-      StatusSection.add(m, new Text(Integer.toString(i)), v);
-      replBw.addMutation(m);
-      m = OrderSection.createMutation(file, time);
-      OrderSection.add(m, new Text(Integer.toString(i)), v);
-      replBw.addMutation(m);
-    }
-
-    Set<String> filesToRemove = new HashSet<>();
-    // We created two mutations for each file
-    numRecords *= 2;
-    int finalNumRecords = numRecords;
-
-    // Add two records that we can delete
-    String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    filesToRemove.add(fileToRemove);
-    Mutation m = new Mutation(fileToRemove);
-    ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
-    Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
-    StatusSection.add(m, new Text("5"), value);
-    WorkSection.add(m, target.toText(), value);
-    replBw.addMutation(m);
-
-    m = OrderSection.createMutation(fileToRemove, time);
-    OrderSection.add(m, new Text("5"), value);
-    replBw.addMutation(m);
-    time++;
-
-    numRecords += 3;
-
-    fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    filesToRemove.add(fileToRemove);
-    m = new Mutation(fileToRemove);
-    value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
-    target = new ReplicationTarget("peer1", "6", "6");
-    StatusSection.add(m, new Text("6"), value);
-    WorkSection.add(m, target.toText(), value);
-    replBw.addMutation(m);
-
-    m = OrderSection.createMutation(fileToRemove, time);
-    OrderSection.add(m, new Text("6"), value);
-    replBw.addMutation(m);
-    time++;
-
-    numRecords += 3;
-
-    replBw.flush();
-
-    // Make sure that we have the expected number of records in both tables
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
-    // We should remove the two fully completed records we inserted
-    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
-    bs.setRanges(Collections.singleton(new Range()));
-    StatusSection.limit(bs);
-    WorkSection.limit(bs);
-    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
-    bs.addScanIterator(cfg);
-
-    try {
-      Assert.assertEquals(4l, rcrr.removeCompleteRecords(conn, bs, replBw));
-    } finally {
-      bs.close();
-      replBw.close();
-    }
-
-    int actualRecords = 0;
-    for (Entry<Key,Value> entry : ReplicationTable.getScanner(conn)) {
-      Assert.assertFalse(filesToRemove.contains(entry.getKey().getRow().toString()));
-      actualRecords++;
-    }
-
-    Assert.assertEquals(finalNumRecords, actualRecords);
-  }
-
-  @Test
-  public void partiallyReplicatedEntriesPrecludeRowDeletion() throws Exception {
-    BatchWriter replBw = ReplicationTable.getBatchWriter(conn);
-    int numRecords = 3;
-
-    Status.Builder builder = Status.newBuilder();
-    builder.setClosed(false);
-    builder.setEnd(10000);
-    builder.setInfiniteEnd(false);
-
-    // Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
-    for (int i = 0; i < numRecords; i++) {
-      String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-      Mutation m = new Mutation(file);
-      StatusSection.add(m, new Text(Integer.toString(i)), ProtobufUtil.toValue(builder.setBegin(1000 * (i + 1)).build()));
-      replBw.addMutation(m);
-    }
-
-    // Add two records that we can delete
-    String fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
-    Mutation m = new Mutation(fileToRemove);
-    ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
-    Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).build());
-    StatusSection.add(m, new Text("5"), value);
-    WorkSection.add(m, target.toText(), value);
-    target = new ReplicationTarget("peer2", "5", "5");
-    WorkSection.add(m, target.toText(), value);
-    target = new ReplicationTarget("peer3", "5", "5");
-    WorkSection.add(m, target.toText(), ProtobufUtil.toValue(builder.setClosed(false).build()));
-    replBw.addMutation(m);
-
-    numRecords += 4;
-
-    replBw.flush();
-
-    // Make sure that we have the expected number of records in both tables
-    Assert.assertEquals(numRecords, Iterables.size(ReplicationTable.getScanner(conn)));
-
-    // We should remove the two fully completed records we inserted
-    BatchScanner bs = ReplicationTable.getBatchScanner(conn, 1);
-    bs.setRanges(Collections.singleton(new Range()));
-    IteratorSetting cfg = new IteratorSetting(50, WholeRowIterator.class);
-    bs.addScanIterator(cfg);
-
-    try {
-      Assert.assertEquals(0l, rcrr.removeCompleteRecords(conn, bs, replBw));
-    } finally {
-      bs.close();
-      replBw.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index d4675db..45fe959 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -18,260 +18,35 @@ package org.apache.accumulo.master.replication;
 
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
-import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 public class SequentialWorkAssignerTest {
 
-  @Rule
-  public TestName test = new TestName();
-
-  private AccumuloConfiguration conf;
   private Connector conn;
-  private Connector mockConn;
   private SequentialWorkAssigner assigner;
 
   @Before
   public void init() throws Exception {
-    conf = createMock(AccumuloConfiguration.class);
+    AccumuloConfiguration conf = createMock(AccumuloConfiguration.class);
     conn = createMock(Connector.class);
     assigner = new SequentialWorkAssigner(conf, conn);
-
-    Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
-    mockConn = inst.getConnector("root", new PasswordToken(""));
-    // Set the connector
-    assigner.setConnector(mockConn);
-    // grant ourselves write to the replication table
-    mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-  }
-
-  @Test
-  public void createWorkForFilesInCorrectOrder() throws Exception {
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    Text serializedTarget = target.toText();
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    // We want the name of file2 to sort before file1
-    String filename1 = "z_file1", filename2 = "a_file1";
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
-    // File1 was closed before file2, however
-    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
-    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
-    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
-    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), file1);
-    expectLastCall().once();
-
-    // file2 is *not* queued because file1 must be replicated first
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-
-    Assert.assertEquals(1, queuedWork.size());
-    Assert.assertTrue(queuedWork.containsKey("cluster1"));
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
-    Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
-  }
-
-  @Test
-  public void workAcrossTablesHappensConcurrently() throws Exception {
-    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
-    Text serializedTarget1 = target1.toText();
-
-    ReplicationTarget target2 = new ReplicationTarget("cluster1", "table2", "2");
-    Text serializedTarget2 = target2.toText();
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    // We want the name of file2 to sort before file1
-    String filename1 = "z_file1", filename2 = "a_file1";
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
-    // File1 was closed before file2, however
-    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
-    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
-    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
-    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
-    expectLastCall().once();
-
-    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
-    expectLastCall().once();
-
-    // file2 is *not* queued because file1 must be replicated first
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-
-    Assert.assertEquals(1, queuedWork.size());
-    Assert.assertTrue(queuedWork.containsKey("cluster1"));
-
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
-    Assert.assertEquals(2, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
-
-    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
-  }
-
-  @Test
-  public void workAcrossPeersHappensConcurrently() throws Exception {
-    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1");
-    Text serializedTarget1 = target1.toText();
-
-    ReplicationTarget target2 = new ReplicationTarget("cluster2", "table1", "1");
-    Text serializedTarget2 = target2.toText();
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    // We want the name of file2 to sort before file1
-    String filename1 = "z_file1", filename2 = "a_file1";
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
-    // File1 was closed before file2, however
-    Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
-    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
-    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
-    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), file1);
-    expectLastCall().once();
-
-    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), file2);
-    expectLastCall().once();
-
-    // file2 is *not* queued because file1 must be replicated first
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-
-    Assert.assertEquals(2, queuedWork.size());
-    Assert.assertTrue(queuedWork.containsKey("cluster1"));
-
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
-    Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
-
-    Map<String,String> cluster2Work = queuedWork.get("cluster2");
-    Assert.assertEquals(1, cluster2Work.size());
-    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -315,69 +90,4 @@ public class SequentialWorkAssignerTest {
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2"));
   }
-
-  @Test
-  public void reprocessingOfCompletedWorkRemovesWork() throws Exception {
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    Text serializedTarget = target.toText();
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    // We want the name of file2 to sort before file1
-    String filename1 = "z_file1", filename2 = "a_file1";
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
-    // File1 was closed before file2, however
-    Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
-    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file1, stat1.getCreatedTime());
-    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
-    bw.addMutation(m);
-
-    m = OrderSection.createMutation(file2, stat2.getCreatedTime());
-    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-
-    // Treat filename1 as we have already submitted it for replication
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
-    Map<String,String> queuedWorkForCluster = new HashMap<>();
-    queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
-    queuedWork.put("cluster1", queuedWorkForCluster);
-
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    // Make sure we expect the invocations in the correct order (accumulo is sorted)
-    workQueue.addWork(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), file2);
-    expectLastCall().once();
-
-    // file2 is queued because we remove file1 because it's fully replicated
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-
-    Assert.assertEquals(1, queuedWork.size());
-    Assert.assertTrue(queuedWork.containsKey("cluster1"));
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
-    Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
deleted file mode 100644
index 11be4fb..0000000
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.master.replication;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.impl.Credentials;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-
-public class StatusMakerTest {
-
-  @Rule
-  public TestName test = new TestName();
-
-  private Connector conn;
-
-  @Before
-  public void setupInstance() throws Exception {
-    Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
-    Credentials creds = new Credentials("root", new PasswordToken(""));
-    conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-  }
-
-  @Test
-  public void statusRecordsCreated() throws Exception {
-    String sourceTable = "source";
-    conn.tableOperations().create(sourceTable);
-    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
-    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
-    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
-    Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
-        walPrefix + UUID.randomUUID());
-    Map<String,Integer> fileToTableId = new HashMap<>();
-
-    int index = 1;
-    long timeCreated = 0;
-    Map<String,Long> fileToTimeCreated = new HashMap<>();
-    for (String file : files) {
-      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
-      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
-      fileToTimeCreated.put(file, timeCreated);
-      bw.addMutation(m);
-      fileToTableId.put(file, index);
-      index++;
-      timeCreated++;
-    }
-
-    bw.close();
-
-    StatusMaker statusMaker = new StatusMaker(conn);
-    statusMaker.setSourceTableName(sourceTable);
-
-    statusMaker.run();
-
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Text file = new Text(), tableId = new Text();
-    for (Entry<Key,Value> entry : s) {
-      StatusSection.getFile(entry.getKey(), file);
-      StatusSection.getTableId(entry.getKey(), tableId);
-
-      Assert.assertTrue("Found unexpected file: " + file, files.contains(file.toString()));
-      Assert.assertEquals(fileToTableId.get(file.toString()), new Integer(tableId.toString()));
-      timeCreated = fileToTimeCreated.get(file.toString());
-      Assert.assertNotNull(timeCreated);
-      Assert.assertEquals(StatusUtil.fileCreated(timeCreated), Status.parseFrom(entry.getValue().get()));
-    }
-  }
-
-  @Test
-  public void openMessagesAreNotDeleted() throws Exception {
-    String sourceTable = "source";
-    conn.tableOperations().create(sourceTable);
-    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
-    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
-    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
-    Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
-        walPrefix + UUID.randomUUID());
-    Map<String,Integer> fileToTableId = new HashMap<>();
-
-    int index = 1;
-    long timeCreated = 0;
-    for (String file : files) {
-      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
-      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
-      bw.addMutation(m);
-      fileToTableId.put(file, index);
-      index++;
-      timeCreated++;
-    }
-
-    bw.close();
-
-    StatusMaker statusMaker = new StatusMaker(conn);
-    statusMaker.setSourceTableName(sourceTable);
-
-    statusMaker.run();
-
-    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    s.fetchColumnFamily(ReplicationSection.COLF);
-    Assert.assertEquals(files.size(), Iterables.size(s));
-  }
-
-  @Test
-  public void closedMessagesAreDeleted() throws Exception {
-    String sourceTable = "source";
-    conn.tableOperations().create(sourceTable);
-    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
-    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
-    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
-    Set<String> files = Sets.newHashSet(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
-        walPrefix + UUID.randomUUID());
-    Map<String,Integer> fileToTableId = new HashMap<>();
-
-    Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).setCreatedTime(System.currentTimeMillis()).build();
-
-    int index = 1;
-    for (String file : files) {
-      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
-      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(stat));
-      bw.addMutation(m);
-      fileToTableId.put(file, index);
-      index++;
-    }
-
-    bw.close();
-
-    StatusMaker statusMaker = new StatusMaker(conn);
-    statusMaker.setSourceTableName(sourceTable);
-
-    statusMaker.run();
-
-    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    s.fetchColumnFamily(ReplicationSection.COLF);
-    for (Entry<Key,Value> e : s) {
-      System.out.println(e.getKey().toStringNoTruncate() + " " + e.getValue());
-    }
-    s = conn.createScanner(sourceTable, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    s.fetchColumnFamily(ReplicationSection.COLF);
-    Assert.assertEquals(0, Iterables.size(s));
-
-  }
-
-  @Test
-  public void closedMessagesCreateOrderRecords() throws Exception {
-    String sourceTable = "source";
-    conn.tableOperations().create(sourceTable);
-    ReplicationTableUtil.configureMetadataTable(conn, sourceTable);
-
-    BatchWriter bw = conn.createBatchWriter(sourceTable, new BatchWriterConfig());
-    String walPrefix = "hdfs://localhost:8020/accumulo/wals/tserver+port/";
-    List<String> files = Arrays.asList(walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(), walPrefix + UUID.randomUUID(),
-        walPrefix + UUID.randomUUID());
-    Map<String,Integer> fileToTableId = new HashMap<>();
-
-    Status.Builder statBuilder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true);
-
-    int index = 1;
-    long time = System.currentTimeMillis();
-    for (String file : files) {
-      statBuilder.setCreatedTime(time++);
-      Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
-      m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
-      bw.addMutation(m);
-      fileToTableId.put(file, index);
-      index++;
-    }
-
-    bw.close();
-
-    StatusMaker statusMaker = new StatusMaker(conn);
-    statusMaker.setSourceTableName(sourceTable);
-
-    statusMaker.run();
-
-    Scanner s = conn.createScanner(sourceTable, Authorizations.EMPTY);
-    s.setRange(ReplicationSection.getRange());
-    s.fetchColumnFamily(ReplicationSection.COLF);
-    Assert.assertEquals(0, Iterables.size(s));
-
-    s = ReplicationTable.getScanner(conn);
-    OrderSection.limit(s);
-    Iterator<Entry<Key,Value>> iter = s.iterator();
-    Assert.assertTrue("Found no order records in replication table", iter.hasNext());
-
-    Iterator<String> expectedFiles = files.iterator();
-    Text buff = new Text();
-    while (expectedFiles.hasNext() && iter.hasNext()) {
-      String file = expectedFiles.next();
-      Entry<Key,Value> entry = iter.next();
-
-      Assert.assertEquals(file, OrderSection.getFile(entry.getKey(), buff));
-      OrderSection.getTableId(entry.getKey(), buff);
-      Assert.assertEquals(fileToTableId.get(file).intValue(), Integer.parseInt(buff.toString()));
-    }
-
-    Assert.assertFalse("Found more files unexpectedly", expectedFiles.hasNext());
-    Assert.assertFalse("Found more entries in replication table unexpectedly", iter.hasNext());
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
index f4c53f0..a9af68b 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/UnorderedWorkAssignerTest.java
@@ -30,50 +30,29 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConstants;
-import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
 
 public class UnorderedWorkAssignerTest {
 
-  @Rule
-  public TestName test = new TestName();
-
-  private AccumuloConfiguration conf;
   private Connector conn;
-  private Connector mockConn;
   private UnorderedWorkAssigner assigner;
 
   @Before
   public void init() throws Exception {
-    conf = createMock(AccumuloConfiguration.class);
+    AccumuloConfiguration conf = createMock(AccumuloConfiguration.class);
     conn = createMock(Connector.class);
     assigner = new UnorderedWorkAssigner(conf, conn);
-
-    Instance inst = new org.apache.accumulo.core.client.mock.MockInstance(test.getMethodName());
-    mockConn = inst.getConnector("root", new PasswordToken(""));
   }
 
   @Test
@@ -122,106 +101,6 @@ public class UnorderedWorkAssignerTest {
   }
 
   @Test
-  public void createWorkForFilesNeedingIt() throws Exception {
-    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
-    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
-    String keyTarget1 = target1.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getRemoteIdentifier()
-        + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target1.getSourceTableId(), keyTarget2 = target2.getPeerName()
-        + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target2.getRemoteIdentifier() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR
-        + target2.getSourceTableId();
-
-    // Set the connector
-    assigner.setConnector(mockConn);
-
-    // grant ourselves write to the replication table
-    mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    Status.Builder builder = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).setCreatedTime(5l);
-    Status status1 = builder.build();
-    builder.setCreatedTime(10l);
-    Status status2 = builder.build();
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(status1));
-    bw.addMutation(m);
-    m = OrderSection.createMutation(file1, status1.getCreatedTime());
-    OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(status1));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(status2));
-    bw.addMutation(m);
-    m = OrderSection.createMutation(file2, status2.getCreatedTime());
-    OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(status2));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    HashSet<String> queuedWork = new HashSet<>();
-    assigner.setQueuedWork(queuedWork);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    // Make sure we expect the invocations in the order they were created
-    String key = filename1 + "|" + keyTarget1;
-    workQueue.addWork(key, file1);
-    expectLastCall().once();
-
-    key = filename2 + "|" + keyTarget2;
-    workQueue.addWork(key, file2);
-    expectLastCall().once();
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-  }
-
-  @Test
-  public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
-    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1", "1"), target2 = new ReplicationTarget("cluster1", "table2", "2");
-    Text serializedTarget1 = target1.toText(), serializedTarget2 = target2.toText();
-
-    // Set the connector
-    assigner.setConnector(mockConn);
-
-    // grant ourselves write to the replication table
-    mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
-    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
-
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
-    bw.addMutation(m);
-
-    m = new Mutation(file2);
-    WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    HashSet<String> queuedWork = new HashSet<>();
-    assigner.setQueuedWork(queuedWork);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-  }
-
-  @Test
   public void workNotInZooKeeperIsCleanedUp() {
     Set<String> queuedWork = new LinkedHashSet<>(Arrays.asList("wal1", "wal2"));
     assigner.setQueuedWork(queuedWork);
@@ -243,41 +122,4 @@ public class UnorderedWorkAssignerTest {
     Assert.assertTrue("Queued work was not emptied", queuedWork.isEmpty());
   }
 
-  @Test
-  public void workNotReAdded() throws Exception {
-    Set<String> queuedWork = new HashSet<>();
-
-    assigner.setQueuedWork(queuedWork);
-
-    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
-    String serializedTarget = target.getPeerName() + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getRemoteIdentifier()
-        + DistributedWorkQueueWorkAssignerHelper.KEY_SEPARATOR + target.getSourceTableId();
-
-    queuedWork.add("wal1|" + serializedTarget.toString());
-
-    // Set the connector
-    assigner.setConnector(mockConn);
-
-    // grant ourselves write to the replication table
-    mockConn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
-    // Create two mutations, both of which need replication work done
-    BatchWriter bw = ReplicationTable.getBatchWriter(mockConn);
-    String file1 = "/accumulo/wal/tserver+port/wal1";
-    Mutation m = new Mutation(file1);
-    WorkSection.add(m, target.toText(), StatusUtil.openWithUnknownLengthValue());
-    bw.addMutation(m);
-
-    bw.close();
-
-    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    assigner.setWorkQueue(workQueue);
-    assigner.setMaxQueueSize(Integer.MAX_VALUE);
-
-    replay(workQueue);
-
-    assigner.createWork();
-
-    verify(workQueue);
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index d6d0b9a..ec849f4 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -16,185 +16,16 @@
  */
 package org.apache.accumulo.master.replication;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 
 public class WorkMakerTest {
 
-  private Instance instance;
-  private Connector conn;
-
-  @Rule
-  public TestName name = new TestName();
-  private AccumuloServerContext context;
-
-  @Before
-  public void createMockAccumulo() throws Exception {
-    instance = new org.apache.accumulo.core.client.mock.MockInstance(name.getMethodName());
-    context = new AccumuloServerContext(new ServerConfigurationFactory(instance));
-    conn = context.getConnector();
-    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-    conn.tableOperations().deleteRows(ReplicationTable.NAME, null, null);
-  }
-
-  @Test
-  public void singleUnitSingleTarget() throws Exception {
-    String table = name.getMethodName();
-    conn.tableOperations().create(name.getMethodName());
-    String tableId = conn.tableOperations().tableIdMap().get(table);
-    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
-
-    // Create a status record for a file
-    long timeCreated = System.currentTimeMillis();
-    Mutation m = new Mutation(new Path(file).toString());
-    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(timeCreated));
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    bw.addMutation(m);
-    bw.flush();
-
-    // Assert that we have one record in the status section
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Assert.assertEquals(1, Iterables.size(s));
-
-    WorkMaker workMaker = new WorkMaker(context, conn);
-
-    // Invoke the addWorkRecord method to create a Work record from the Status record earlier
-    ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4", tableId);
-    workMaker.setBatchWriter(bw);
-    workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(timeCreated), ImmutableMap.of("remote_cluster_1", "4"), tableId);
-
-    // Scan over just the WorkSection
-    s = ReplicationTable.getScanner(conn);
-    WorkSection.limit(s);
-
-    Entry<Key,Value> workEntry = Iterables.getOnlyElement(s);
-    Key workKey = workEntry.getKey();
-    ReplicationTarget actual = ReplicationTarget.from(workKey.getColumnQualifier());
-
-    Assert.assertEquals(file, workKey.getRow().toString());
-    Assert.assertEquals(WorkSection.NAME, workKey.getColumnFamily());
-    Assert.assertEquals(expected, actual);
-    Assert.assertEquals(workEntry.getValue(), StatusUtil.fileCreatedValue(timeCreated));
-  }
-
-  @Test
-  public void singleUnitMultipleTargets() throws Exception {
-    String table = name.getMethodName();
-    conn.tableOperations().create(name.getMethodName());
-
-    String tableId = conn.tableOperations().tableIdMap().get(table);
-
-    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
-
-    Mutation m = new Mutation(new Path(file).toString());
-    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
-    BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
-    bw.addMutation(m);
-    bw.flush();
-
-    // Assert that we have one record in the status section
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Assert.assertEquals(1, Iterables.size(s));
-
-    WorkMaker workMaker = new WorkMaker(context, conn);
-
-    Map<String,String> targetClusters = ImmutableMap.of("remote_cluster_1", "4", "remote_cluster_2", "6", "remote_cluster_3", "8");
-    Set<ReplicationTarget> expectedTargets = new HashSet<>();
-    for (Entry<String,String> cluster : targetClusters.entrySet()) {
-      expectedTargets.add(new ReplicationTarget(cluster.getKey(), cluster.getValue(), tableId));
-    }
-    workMaker.setBatchWriter(bw);
-    workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(System.currentTimeMillis()), targetClusters, tableId);
-
-    s = ReplicationTable.getScanner(conn);
-    WorkSection.limit(s);
-
-    Set<ReplicationTarget> actualTargets = new HashSet<>();
-    for (Entry<Key,Value> entry : s) {
-      Assert.assertEquals(file, entry.getKey().getRow().toString());
-      Assert.assertEquals(WorkSection.NAME, entry.getKey().getColumnFamily());
-
-      ReplicationTarget target = ReplicationTarget.from(entry.getKey().getColumnQualifier());
-      actualTargets.add(target);
-    }
-
-    for (ReplicationTarget expected : expectedTargets) {
-      Assert.assertTrue("Did not find expected target: " + expected, actualTargets.contains(expected));
-      actualTargets.remove(expected);
-    }
-
-    Assert.assertTrue("Found extra replication work entries: " + actualTargets, actualTargets.isEmpty());
-  }
-
-  @Test
-  public void dontCreateWorkForEntriesWithNothingToReplicate() throws Exception {
-    String table = name.getMethodName();
-    conn.tableOperations().create(name.getMethodName());
-    String tableId = conn.tableOperations().tableIdMap().get(table);
-    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
-
-    Mutation m = new Mutation(new Path(file).toString());
-    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
-    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-    bw.addMutation(m);
-    bw.flush();
-
-    // Assert that we have one record in the status section
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    Assert.assertEquals(1, Iterables.size(s));
-
-    WorkMaker workMaker = new WorkMaker(context, conn);
-
-    conn.tableOperations().setProperty(ReplicationTable.NAME, Property.TABLE_REPLICATION_TARGET.getKey() + "remote_cluster_1", "4");
-
-    workMaker.setBatchWriter(bw);
-
-    // If we don't shortcircuit out, we should get an exception because ServerConfiguration.getTableConfiguration
-    // won't work with MockAccumulo
-    workMaker.run();
-
-    s = ReplicationTable.getScanner(conn);
-    WorkSection.limit(s);
-
-    Assert.assertEquals(0, Iterables.size(s));
-  }
-
   @Test
   public void closedStatusRecordsStillMakeWork() throws Exception {
-    WorkMaker workMaker = new WorkMaker(context, conn);
+    WorkMaker workMaker = new WorkMaker(null, null);
 
     Assert.assertFalse(workMaker.shouldCreateWork(StatusUtil.fileCreated(System.currentTimeMillis())));
     Assert.assertTrue(workMaker.shouldCreateWork(StatusUtil.ingestedUntil(1000)));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java
----------------------------------------------------------------------
diff --git a/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java b/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java
deleted file mode 100644
index 02a9ee2..0000000
--- a/shell/src/test/java/org/apache/accumulo/shell/commands/FormatterCommandTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.shell.commands;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.Iterator;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.TableExistsException;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.format.Formatter;
-import org.apache.accumulo.shell.Shell;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Tests the shell output with Formatters
- */
-public class FormatterCommandTest {
-  ByteArrayOutputStream out = null;
-  InputStream in = null;
-
-  @Test
-  public void test() throws IOException, AccumuloException, AccumuloSecurityException, TableExistsException, ClassNotFoundException {
-    // Keep the Shell AUDIT log off the test output
-    Logger.getLogger(Shell.class).setLevel(Level.WARN);
-
-    File config = Files.createTempFile(null, null).toFile();
-    config.deleteOnExit();
-    final String[] args = new String[] {"--config-file", config.toString(), "--fake", "-u", "root", "-p", ""};
-
-    final String[] commands = createCommands();
-
-    in = org.apache.accumulo.shell.mock.MockShell.makeCommands(commands);
-    out = new ByteArrayOutputStream();
-
-    final Shell shell = new org.apache.accumulo.shell.mock.MockShell(in, out);
-    assertTrue("Failed to configure shell without error", shell.config(args));
-
-    // Can't call createtable in the shell with MockAccumulo
-    shell.getConnector().tableOperations().create("test");
-
-    try {
-      shell.start();
-    } catch (Exception e) {
-      Assert.fail("Exception while running commands: " + e.getMessage());
-    }
-
-    shell.getReader().flush();
-
-    final String[] output = new String(out.toByteArray()).split("\n\r");
-
-    boolean formatterOn = false;
-
-    final String[] expectedDefault = new String[] {"row cf:cq []    1234abcd", "row cf1:cq1 []    9876fedc", "row2 cf:cq []    13579bdf",
-        "row2 cf1:cq []    2468ace"};
-
-    final String[] expectedFormatted = new String[] {"row cf:cq []    0x31 0x32 0x33 0x34 0x61 0x62 0x63 0x64",
-        "row cf1:cq1 []    0x39 0x38 0x37 0x36 0x66 0x65 0x64 0x63", "row2 cf:cq []    0x31 0x33 0x35 0x37 0x39 0x62 0x64 0x66",
-        "row2 cf1:cq []    0x32 0x34 0x36 0x38 0x61 0x63 0x65"};
-
-    int outputIndex = 0;
-    while (outputIndex < output.length) {
-      final String line = output[outputIndex];
-
-      if (line.startsWith("root@mock-instance")) {
-        if (line.contains("formatter")) {
-          formatterOn = true;
-        }
-
-        outputIndex++;
-      } else if (line.startsWith("row")) {
-        int expectedIndex = 0;
-        String[] comparisonData;
-
-        // Pick the type of data we expect (formatted or default)
-        if (formatterOn) {
-          comparisonData = expectedFormatted;
-        } else {
-          comparisonData = expectedDefault;
-        }
-
-        // Ensure each output is what we expected
-        while (expectedIndex + outputIndex < output.length && expectedIndex < expectedFormatted.length) {
-          Assert.assertEquals(comparisonData[expectedIndex].trim(), output[expectedIndex + outputIndex].trim());
-          expectedIndex++;
-        }
-
-        outputIndex += expectedIndex;
-      }
-    }
-  }
-
-  private String[] createCommands() {
-    return new String[] {"table test", "insert row cf cq 1234abcd", "insert row cf1 cq1 9876fedc", "insert row2 cf cq 13579bdf", "insert row2 cf1 cq 2468ace",
-        "scan", "formatter -t test -f org.apache.accumulo.core.util.shell.command.FormatterCommandTest$HexFormatter", "scan"};
-  }
-
-  /**
-   * <p>
-   * Simple <code>Formatter</code> that will convert each character in the Value from decimal to hexadecimal. Will automatically skip over characters in the
-   * value which do not fall within the [0-9,a-f] range.
-   * </p>
-   *
-   * <p>
-   * Example: <code>'0'</code> will be displayed as <code>'0x30'</code>
-   * </p>
-   */
-  public static class HexFormatter implements Formatter {
-    private Iterator<Entry<Key,Value>> iter = null;
-    private boolean printTs = false;
-
-    private final static String tab = "\t";
-    private final static String newline = "\n";
-
-    public HexFormatter() {}
-
-    @Override
-    public boolean hasNext() {
-      return this.iter.hasNext();
-    }
-
-    @Override
-    public String next() {
-      final Entry<Key,Value> entry = iter.next();
-
-      String key;
-
-      // Observe the timestamps
-      if (printTs) {
-        key = entry.getKey().toString();
-      } else {
-        key = entry.getKey().toStringNoTime();
-      }
-
-      final Value v = entry.getValue();
-
-      // Approximate how much space we'll need
-      final StringBuilder sb = new StringBuilder(key.length() + v.getSize() * 5);
-
-      sb.append(key).append(tab);
-
-      for (byte b : v.get()) {
-        if ((b >= 48 && b <= 57) || (b >= 97 && b <= 102)) {
-          sb.append(String.format("0x%x ", Integer.valueOf(b)));
-        }
-      }
-
-      sb.append(newline);
-
-      return sb.toString();
-    }
-
-    @Override
-    public void remove() {}
-
-    @Override
-    public void initialize(final Iterable<Entry<Key,Value>> scanner, final boolean printTimestamps) {
-      this.iter = scanner.iterator();
-      this.printTs = printTimestamps;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/cc3c0111/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java
deleted file mode 100644
index 6b5e6eb..0000000
--- a/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.MutationsRejectedException;
-import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacBase;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.junit.Test;
-
-/**
- * Prevent regression of ACCUMULO-3709.
- */
-public class AccumuloOutputFormatIT extends ConfigurableMacBase {
-
-  private static final String TABLE = "abc";
-
-  @Override
-  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-    cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "1");
-    cfg.setNumTservers(1);
-  }
-
-  @Test
-  public void testMapred() throws Exception {
-    Connector connector = getConnector();
-    // create a table and put some data in it
-    connector.tableOperations().create(TABLE);
-
-    JobConf job = new JobConf();
-    BatchWriterConfig batchConfig = new BatchWriterConfig();
-    // no flushes!!!!!
-    batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
-    // use a single thread to ensure our update session times out
-    batchConfig.setMaxWriteThreads(1);
-    // set the max memory so that we ensure we don't flush on the write.
-    batchConfig.setMaxMemory(Long.MAX_VALUE);
-    AccumuloOutputFormat outputFormat = new AccumuloOutputFormat();
-    AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig);
-    AccumuloOutputFormat.setZooKeeperInstance(job, cluster.getClientConfig());
-    AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(ROOT_PASSWORD));
-    RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null);
-
-    try {
-      for (int i = 0; i < 3; i++) {
-        Mutation m = new Mutation(new Text(String.format("%08d", i)));
-        for (int j = 0; j < 3; j++) {
-          m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8)));
-        }
-        writer.write(new Text(TABLE), m);
-      }
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      // we don't want the exception to come from write
-    }
-
-    connector.securityOperations().revokeTablePermission("root", TABLE, TablePermission.WRITE);
-
-    try {
-      writer.close(null);
-      fail("Did not throw exception");
-    } catch (IOException ex) {
-      log.info(ex.getMessage(), ex);
-      assertTrue(ex.getCause() instanceof MutationsRejectedException);
-    }
-  }
-}