You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 04:00:04 UTC

[45/50] [abbrv] git commit: ACCUMULO-2819 Add a test to ensure that an inprogress file that's fully replicated is removed.

ACCUMULO-2819 Add a test to ensure that an inprogress file that's fully replicated is removed.

This is mainly an optimization as it would be cleaned up outside of the
work assignment loop, but it should help to make a more responsive work assignment loop.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/50117b35
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/50117b35
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/50117b35

Branch: refs/heads/ACCUMULO-378
Commit: 50117b35535f7701ebde58f089489dd8dbcd432d
Parents: 57dfe48
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 14:06:14 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 14:06:14 2014 -0400

----------------------------------------------------------------------
 .../replication/SequentialWorkAssignerTest.java | 76 ++++++++++++++++++++
 1 file changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/50117b35/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index ebc540f..0820e1c 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -345,4 +345,80 @@ public class SequentialWorkAssignerTest {
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertEquals(AbstractWorkAssigner.getQueueKey("file2", new ReplicationTarget("cluster1", "2", "2")), cluster1Work.get("2"));
   }
+
+  @Test
+  public void reprocessingOfCompletedWorkRemovesWork() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1", "1");
+    Text serializedTarget = target.toText();
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    // We want the name of file2 to sort before file1
+    String filename1 = "z_file1", filename2 = "a_file1";
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+
+    // File1 was closed before file2, however
+    Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
+    Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file1, stat1.getClosedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
+    bw.addMutation(m);
+
+    m = OrderSection.createMutation(file2, stat2.getClosedTime());
+    OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+
+    // Treat filename1 as we have already submitted it for replication
+    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,String> queuedWorkForCluster = new HashMap<>();
+    queuedWorkForCluster.put(target.getSourceTableId(), AbstractWorkAssigner.getQueueKey(filename1, target));
+    queuedWork.put("cluster1", queuedWorkForCluster);
+
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+    assigner.setMaxQueueSize(Integer.MAX_VALUE);
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    workQueue.addWork(AbstractWorkAssigner.getQueueKey(filename2, target), file2);
+    expectLastCall().once();
+
+    // file2 is *not* queued because file1 must be replicated first
+
+    replay(workQueue);
+
+    assigner.createWork();
+
+    verify(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertTrue(queuedWork.containsKey("cluster1"));
+    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Assert.assertEquals(1, cluster1Work.size());
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(AbstractWorkAssigner.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
+  }
 }