You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2017/09/06 22:51:54 UTC

[accumulo] branch master updated (10dfc20 -> 953c559)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from 10dfc20  Merge branch '1.8'
     new 7b35311  ACCUMULO-4662 Refactored TableID in replication
     new 953c559  ACCUMULO-4686 Formatting update from verify

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/accumulo/server/AccumuloTest.java   |  3 +-
 .../master/replication/SequentialWorkAssigner.java | 41 +++++++++---------
 .../replication/SequentialWorkAssignerTest.java    | 10 ++---
 .../test/replication/SequentialWorkAssignerIT.java | 48 +++++++++++-----------
 4 files changed, 52 insertions(+), 50 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].

[accumulo] 01/02: ACCUMULO-4662 Refactored TableID in replication

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 7b35311b8a27ce3bc3a2c12a8e348ddb7c7cfde6
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Sep 6 16:44:03 2017 -0400

    ACCUMULO-4662 Refactored TableID in replication
---
 .../master/replication/SequentialWorkAssigner.java | 41 +++++++++---------
 .../replication/SequentialWorkAssignerTest.java    | 10 ++---
 .../test/replication/SequentialWorkAssignerIT.java | 48 +++++++++++-----------
 3 files changed, 50 insertions(+), 49 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index f273a3f..c07707c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.Table;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.replication.ReplicationConstants;
 import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -53,7 +54,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
    * }
    */
   // @formatter:on
-  private Map<String,Map<String,String>> queuedWorkByPeerName;
+  private Map<String,Map<Table.ID,String>> queuedWorkByPeerName;
 
   public SequentialWorkAssigner() {}
 
@@ -66,11 +67,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
     return NAME;
   }
 
-  protected Map<String,Map<String,String>> getQueuedWork() {
+  protected Map<String,Map<Table.ID,String>> getQueuedWork() {
     return queuedWorkByPeerName;
   }
 
-  protected void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+  protected void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) {
     this.queuedWorkByPeerName = queuedWork;
   }
 
@@ -97,11 +98,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
       Entry<String,ReplicationTarget> entry = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(work);
       String filename = entry.getKey();
       String peerName = entry.getValue().getPeerName();
-      String sourceTableId = entry.getValue().getSourceTableId().canonicalID();
+      Table.ID sourceTableId = entry.getValue().getSourceTableId();
 
       log.debug("In progress replication of {} from table with ID {} to peer {}", filename, sourceTableId, peerName);
 
-      Map<String,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
+      Map<Table.ID,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
       if (null == replicationForPeer) {
         replicationForPeer = new HashMap<>();
         queuedWorkByPeerName.put(peerName, replicationForPeer);
@@ -116,24 +117,24 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
    */
   @Override
   protected void cleanupFinishedWork() {
-    final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
+    final Iterator<Entry<String,Map<Table.ID,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
     final String instanceId = conn.getInstance().getInstanceID();
 
     int elementsRemoved = 0;
     // Check the status of all the work we've queued up
     while (queuedWork.hasNext()) {
       // {peer -> {tableId -> workKey, tableId -> workKey, ... }, peer -> ...}
-      Entry<String,Map<String,String>> workForPeer = queuedWork.next();
+      Entry<String,Map<Table.ID,String>> workForPeer = queuedWork.next();
 
       // TableID to workKey (filename and ReplicationTarget)
-      Map<String,String> queuedReplication = workForPeer.getValue();
+      Map<Table.ID,String> queuedReplication = workForPeer.getValue();
 
-      Iterator<Entry<String,String>> iter = queuedReplication.entrySet().iterator();
+      Iterator<Entry<Table.ID,String>> iter = queuedReplication.entrySet().iterator();
       // Loop over every target we need to replicate this file to, removing the target when
       // the replication task has finished
       while (iter.hasNext()) {
         // tableID -> workKey
-        Entry<String,String> entry = iter.next();
+        Entry<Table.ID,String> entry = iter.next();
         // Null equates to the work for this target was finished
         if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE + "/" + entry.getValue())) {
           log.debug("Removing {} from work assignment state", entry.getValue());
@@ -153,12 +154,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   @Override
   protected boolean shouldQueueWork(ReplicationTarget target) {
-    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == queuedWorkForPeer) {
       return true;
     }
 
-    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
 
     // If we have no work for the local table to the given peer, submit some!
     return null == queuedWork;
@@ -167,17 +168,17 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
   @Override
   protected boolean queueWork(Path path, ReplicationTarget target) {
     String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target);
-    Map<String,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == workForPeer) {
       workForPeer = new HashMap<>();
       this.queuedWorkByPeerName.put(target.getPeerName(), workForPeer);
     }
 
-    String queuedWork = workForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = workForPeer.get(target.getSourceTableId());
     if (null == queuedWork) {
       try {
         workQueue.addWork(queueKey, path.toString());
-        workForPeer.put(target.getSourceTableId().canonicalID(), queueKey);
+        workForPeer.put(target.getSourceTableId(), queueKey);
       } catch (KeeperException | InterruptedException e) {
         log.warn("Could not queue work for {} to {}", path, target, e);
         return false;
@@ -195,12 +196,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   @Override
   protected Set<String> getQueuedWork(ReplicationTarget target) {
-    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == queuedWorkForPeer) {
       return Collections.emptySet();
     }
 
-    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
     if (null == queuedWork) {
       return Collections.emptySet();
     } else {
@@ -210,15 +211,15 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
 
   @Override
   protected void removeQueuedWork(ReplicationTarget target, String queueKey) {
-    Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+    Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
     if (null == queuedWorkForPeer) {
       log.warn("removeQueuedWork called when no work was queued for {}", target.getPeerName());
       return;
     }
 
-    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+    String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
     if (queuedWork.equals(queueKey)) {
-      queuedWorkForPeer.remove(target.getSourceTableId().canonicalID());
+      queuedWorkForPeer.remove(target.getSourceTableId());
     } else {
       log.warn("removeQueuedWork called on {} with differing queueKeys, expected {} but was {}", target, queueKey, queuedWork);
       return;
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index 26a090a..38c63f6 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -56,12 +56,12 @@ public class SequentialWorkAssignerTest {
     ZooCache zooCache = createMock(ZooCache.class);
     Instance inst = createMock(Instance.class);
 
-    Map<String,Map<String,String>> queuedWork = new TreeMap<>();
-    Map<String,String> cluster1Work = new TreeMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new TreeMap<>();
+    Map<Table.ID,String> cluster1Work = new TreeMap<>();
 
     // Two files for cluster1, one for table '1' and another for table '2' we havce assigned work for
-    cluster1Work.put("1", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1"))));
-    cluster1Work.put("2", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))));
+    cluster1Work.put(Table.ID.of("1"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1"))));
+    cluster1Work.put(Table.ID.of("2"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))));
 
     queuedWork.put("cluster1", cluster1Work);
 
@@ -90,6 +90,6 @@ public class SequentialWorkAssignerTest {
 
     Assert.assertEquals(1, cluster1Work.size());
     Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))),
-        cluster1Work.get("2"));
+        cluster1Work.get(Table.ID.of("2")));
   }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
index 2a7b853..7b2055f 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
@@ -62,7 +62,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     }
 
     @Override
-    public void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+    public void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) {
       super.setQueuedWork(queuedWork);
     }
 
@@ -137,7 +137,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -156,10 +156,10 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
 
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
   }
 
   @Test
@@ -199,7 +199,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -222,13 +222,13 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
 
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(2, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
 
-    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -268,7 +268,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     bw.close();
 
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
     assigner.setQueuedWork(queuedWork);
     assigner.setWorkQueue(workQueue);
     assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -291,15 +291,15 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     Assert.assertEquals(2, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
 
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
 
-    Map<String,String> cluster2Work = queuedWork.get("cluster2");
+    Map<Table.ID,String> cluster2Work = queuedWork.get("cluster2");
     Assert.assertEquals(1, cluster2Work.size());
-    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
   }
 
   @Test
@@ -338,9 +338,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
     DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
 
     // Treat filename1 as we have already submitted it for replication
-    Map<String,Map<String,String>> queuedWork = new HashMap<>();
-    Map<String,String> queuedWorkForCluster = new HashMap<>();
-    queuedWorkForCluster.put(target.getSourceTableId().canonicalID(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
+    Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
+    Map<Table.ID,String> queuedWorkForCluster = new HashMap<>();
+    queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
     queuedWork.put("cluster1", queuedWorkForCluster);
 
     assigner.setQueuedWork(queuedWork);
@@ -361,9 +361,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
 
     Assert.assertEquals(1, queuedWork.size());
     Assert.assertTrue(queuedWork.containsKey("cluster1"));
-    Map<String,String> cluster1Work = queuedWork.get("cluster1");
+    Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
     Assert.assertEquals(1, cluster1Work.size());
-    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID()));
-    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId().canonicalID()));
+    Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+    Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
   }
 }

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.

[accumulo] 02/02: ACCUMULO-4686 Formatting update from verify

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 953c559c72eb6ccf484171e08a677c6db45bc33b
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Sep 6 18:49:48 2017 -0400

    ACCUMULO-4686 Formatting update from verify
---
 server/base/src/test/java/org/apache/accumulo/server/AccumuloTest.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/server/base/src/test/java/org/apache/accumulo/server/AccumuloTest.java b/server/base/src/test/java/org/apache/accumulo/server/AccumuloTest.java
index aba9b36..e43f5a0 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/AccumuloTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/AccumuloTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.FileNotFoundException;
 
-import com.google.common.collect.Sets;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -33,6 +32,8 @@ import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
 public class AccumuloTest {
   private FileSystem fs;
   private Path path;

-- 
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.