You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2017/09/06 22:51:55 UTC

[accumulo] 01/02: ACCUMULO-4662 Refactored TableID in replication

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 7b35311b8a27ce3bc3a2c12a8e348ddb7c7cfde6
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Sep 6 16:44:03 2017 -0400

    ACCUMULO-4662 Refactored TableID in replication
---
 .../master/replication/SequentialWorkAssigner.java | 41 +++++++++---------
 .../replication/SequentialWorkAssignerTest.java    | 10 ++---
 .../test/replication/SequentialWorkAssignerIT.java | 48 +++++++++++-----------
 3 files changed, 50 insertions(+), 49 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index f273a3f..c07707c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -53,7 +54,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
    * }
    */
   // @formatter:on
-  private Map<String,Map<String,String>> queuedWorkByPeerName;
+  private Map<String,Map<Table.ID,String>> queuedWorkByPeerName;
 
   public SequentialWorkAssigner() {}
 
@@ -66,11 +67,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
     return NAME;
   }
 
-  protected Map<String,Map<String,String>> getQueuedWork() {
+  protected Map<String,Map<Table.ID,String>> getQueuedWork() {
     return queuedWorkByPeerName;
   }
 
-  protected void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+  protected void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) {
     this.queuedWorkByPeerName = queuedWork;
   }
 
@@ -97,11 +98,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
       Entry<String,ReplicationTarget> entry = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(work);
       String filename = entry.getKey();
       String peerName = entry.getValue().getPeerName();
-      String sourceTableId = entry.getValue().getSourceTableId().canonicalID();
+      Table.ID sourceTableId = entry.getValue().getSourceTableId();
 
       log.debug("In progress replication of {} from table with ID {} to peer {}", filename, sourceTableId, peerName);
 
-      Map<String,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
+      Map<Table.ID,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
       if (null == replicationForPeer) {
         replicationForPeer = new HashMap<>();
         queuedWorkByPeerName.put(peerName, replicationForPeer);
@@ -116,24 +117,24 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
    */
   @Override
   protected void cleanupFinishedWork() {
-    final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
+    final Iterator<Entry<String,Map<Table.ID,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
     final String instanceId = conn.getInstance().getInstanceID();
 
     int elementsRemoved = 0;
     // Check the status of all the work we've queued up
     while (queuedWork.hasNext()) {
       // {peer -> {tableId -> workKey, tableId -> workKey, ... }, peer -> ...}
-      Entry<String,Map<String,String>> workForPeer = queuedWork.next();
+      Entry<String,Map<Table.ID,String>> workForPeer = queuedWork.next();
 
       // TableID to workKey (filename and ReplicationTarget)
-      Map<String,String> queuedReplication = workForPeer.getValue();
+      Map<Table.ID,String> queuedReplication = workForPeer.getValue();
 
-      Iterator<Entry<String,String>> iter = queuedReplication.entrySet().iterator();
+      Iterator<Entry<Table.ID,String>> iter = queuedReplication.entrySet().iterator();
       // Loop over every target we need to replicate this file to, removing the target when
       // the replication task has finished
       while (iter.hasNext()) {
         // tableID -> workKey
-        Entry<String,String> entry = iter.next();
+        Entry<Table.ID,String> entry = iter.next();
         // Null equates to the work for this target was finished
         if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE + "/" + entry.getValue())) {
           log.debug("Removing {} from work assignment state", entry.getValue());
@@ -153,12 +154,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   @Override
   protected boolean shouldQueueWork(ReplicationTarget target) {
-    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == queuedWorkForPeer) {
       return true;
     }
 
-    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
 
     // If we have no work for the local table to the given peer, submit some!
     return null == queuedWork;
@@ -167,17 +168,17 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
   @Override
   protected boolean queueWork(Path path, ReplicationTarget target) {
     String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target);
-    Map<String,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == workForPeer) {
       workForPeer = new HashMap<>();
       this.queuedWorkByPeerName.put(target.getPeerName(), workForPeer);
     }
 
-    String queuedWork = workForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = workForPeer.get(target.getSourceTableId());
     if (null == queuedWork) {
       try {
         workQueue.addWork(queueKey, path.toString());
-        workForPeer.put(target.getSourceTableId().canonicalID(), queueKey);
+        workForPeer.put(target.getSourceTableId(), queueKey);
       } catch (KeeperException | InterruptedException e) {
         log.warn("Could not queue work for {} to {}", path, target, e);
         return false;
@@ -195,12 +196,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   @Override
   protected Set<String> getQueuedWork(ReplicationTarget target) {
-    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == queuedWorkForPeer) {
       return Collections.emptySet();
     }
 
-    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
     if (null == queuedWork) {
       return Collections.emptySet();
     } else {
@@ -210,15 +211,15 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   @Override
   protected void removeQueuedWork(ReplicationTarget target, String queueKey) {
-    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == queuedWorkForPeer) {
       log.warn("removeQueuedWork called when no work was queued for {}", target.getPeerName());
       return;
     }
 
-    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
     if (queuedWork.equals(queueKey)) {
-      queuedWorkForPeer.remove(target.getSourceTableId().canonicalID());
+      queuedWorkForPeer.remove(target.getSourceTableId());
     } else {
       log.warn("removeQueuedWork called on {} with differing queueKeys, expected {} but was {}", target, queueKey, queuedWork);
       return;
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index 26a090a..38c63f6 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -56,12 +56,12 @@ public class SequentialWorkAssignerTest {
     ZooCache zooCache = createMock(ZooCache.class);
     Instance inst = createMock(Instance.class);
 
-    Map<String,Map<String,String>> queuedWork = new TreeMap<>();
-    Map<String,String> cluster1Work = new TreeMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new TreeMap<>();
+    Map<Table.ID,String> cluster1Work = new TreeMap<>();
 
     // Two files for cluster1, one for table '1' and another for table '2' we havce assigned work for
-    cluster1Work.put("1", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1"))));
-    cluster1Work.put("2", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))));
+    cluster1Work.put(Table.ID.of("1"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1"))));
+    cluster1Work.put(Table.ID.of("2"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))));
 
     queuedWork.put("cluster1", cluster1Work);
 
@@ -90,6 +90,6 @@ public class SequentialWorkAssignerTest {
 
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))),
-        cluster1Work.get("2"));
+        cluster1Work.get(Table.ID.of("2")));
   }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
index 2a7b853..7b2055f 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
@@ -62,7 +62,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     }
 
     @Override
-    public void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+    public void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) {
       super.setQueuedWork(queuedWork);
     }
 
@@ -137,7 +137,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -156,10 +156,10 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
 
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
   }
 
   @Test
@@ -199,7 +199,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -222,13 +222,13 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
 
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(2, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
 
-    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -268,7 +268,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -291,15 +291,15 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     Assert.assertEquals(2, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
 
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
 
-    Map<String,String> cluster2Work = queuedWork.get("cluster2");
+    Map<Table.ID,String> cluster2Work = queuedWork.get("cluster2");
     Assert.assertEquals(1, cluster2Work.size());
-    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -338,9 +338,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
 
     // Treat filename1 as we have already submitted it for replication
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
-    Map<String,String> queuedWorkForCluster = new HashMap<>();
-    queuedWorkForCluster.put(target.getSourceTableId().canonicalID(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
+    Map<Table.ID,String> queuedWorkForCluster = new HashMap<>();
+    queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
     queuedWork.put("cluster1", queuedWorkForCluster);
 
     assigner.setQueuedWork(queuedWork);
@@ -361,9 +361,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
 
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.