You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2017/09/06 22:51:55 UTC
[accumulo] 01/02: ACCUMULO-4662 Refactored TableID in replication
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 7b35311b8a27ce3bc3a2c12a8e348ddb7c7cfde6
Author: Mike Miller <mm...@apache.org>
AuthorDate: Wed Sep 6 16:44:03 2017 -0400
ACCUMULO-4662 Refactored TableID in replication
---
.../master/replication/SequentialWorkAssigner.java | 41 +++++++++---------
.../replication/SequentialWorkAssignerTest.java | 10 ++---
.../test/replication/SequentialWorkAssignerIT.java | 48 +++++++++++-----------
3 files changed, 50 insertions(+), 49 deletions(-)
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index f273a3f..c07707c 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -53,7 +54,7 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
* }
*/
// @formatter:on
- private Map<String,Map<String,String>> queuedWorkByPeerName;
+ private Map<String,Map<Table.ID,String>> queuedWorkByPeerName;
public SequentialWorkAssigner() {}
@@ -66,11 +67,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
return NAME;
}
- protected Map<String,Map<String,String>> getQueuedWork() {
+ protected Map<String,Map<Table.ID,String>> getQueuedWork() {
return queuedWorkByPeerName;
}
- protected void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+ protected void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) {
this.queuedWorkByPeerName = queuedWork;
}
@@ -97,11 +98,11 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
Entry<String,ReplicationTarget> entry = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(work);
String filename = entry.getKey();
String peerName = entry.getValue().getPeerName();
- String sourceTableId = entry.getValue().getSourceTableId().canonicalID();
+ Table.ID sourceTableId = entry.getValue().getSourceTableId();
log.debug("In progress replication of {} from table with ID {} to peer {}", filename, sourceTableId, peerName);
- Map<String,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
+ Map<Table.ID,String> replicationForPeer = queuedWorkByPeerName.get(peerName);
if (null == replicationForPeer) {
replicationForPeer = new HashMap<>();
queuedWorkByPeerName.put(peerName, replicationForPeer);
@@ -116,24 +117,24 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
*/
@Override
protected void cleanupFinishedWork() {
- final Iterator<Entry<String,Map<String,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
+ final Iterator<Entry<String,Map<Table.ID,String>>> queuedWork = queuedWorkByPeerName.entrySet().iterator();
final String instanceId = conn.getInstance().getInstanceID();
int elementsRemoved = 0;
// Check the status of all the work we've queued up
while (queuedWork.hasNext()) {
// {peer -> {tableId -> workKey, tableId -> workKey, ... }, peer -> ...}
- Entry<String,Map<String,String>> workForPeer = queuedWork.next();
+ Entry<String,Map<Table.ID,String>> workForPeer = queuedWork.next();
// TableID to workKey (filename and ReplicationTarget)
- Map<String,String> queuedReplication = workForPeer.getValue();
+ Map<Table.ID,String> queuedReplication = workForPeer.getValue();
- Iterator<Entry<String,String>> iter = queuedReplication.entrySet().iterator();
+ Iterator<Entry<Table.ID,String>> iter = queuedReplication.entrySet().iterator();
// Loop over every target we need to replicate this file to, removing the target when
// the replication task has finished
while (iter.hasNext()) {
// tableID -> workKey
- Entry<String,String> entry = iter.next();
+ Entry<Table.ID,String> entry = iter.next();
// Null equates to the work for this target was finished
if (null == zooCache.get(ZooUtil.getRoot(instanceId) + ReplicationConstants.ZOO_WORK_QUEUE + "/" + entry.getValue())) {
log.debug("Removing {} from work assignment state", entry.getValue());
@@ -153,12 +154,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
@Override
protected boolean shouldQueueWork(ReplicationTarget target) {
- Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+ Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
if (null == queuedWorkForPeer) {
return true;
}
- String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+ String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
// If we have no work for the local table to the given peer, submit some!
return null == queuedWork;
@@ -167,17 +168,17 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
@Override
protected boolean queueWork(Path path, ReplicationTarget target) {
String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target);
- Map<String,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+ Map<Table.ID,String> workForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
if (null == workForPeer) {
workForPeer = new HashMap<>();
this.queuedWorkByPeerName.put(target.getPeerName(), workForPeer);
}
- String queuedWork = workForPeer.get(target.getSourceTableId().canonicalID());
+ String queuedWork = workForPeer.get(target.getSourceTableId());
if (null == queuedWork) {
try {
workQueue.addWork(queueKey, path.toString());
- workForPeer.put(target.getSourceTableId().canonicalID(), queueKey);
+ workForPeer.put(target.getSourceTableId(), queueKey);
} catch (KeeperException | InterruptedException e) {
log.warn("Could not queue work for {} to {}", path, target, e);
return false;
@@ -195,12 +196,12 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
@Override
protected Set<String> getQueuedWork(ReplicationTarget target) {
- Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+ Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
if (null == queuedWorkForPeer) {
return Collections.emptySet();
}
- String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+ String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
if (null == queuedWork) {
return Collections.emptySet();
} else {
@@ -210,15 +211,15 @@ public class SequentialWorkAssigner extends DistributedWorkQueueWorkAssigner {
@Override
protected void removeQueuedWork(ReplicationTarget target, String queueKey) {
- Map<String,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
+ Map<Table.ID,String> queuedWorkForPeer = this.queuedWorkByPeerName.get(target.getPeerName());
if (null == queuedWorkForPeer) {
log.warn("removeQueuedWork called when no work was queued for {}", target.getPeerName());
return;
}
- String queuedWork = queuedWorkForPeer.get(target.getSourceTableId().canonicalID());
+ String queuedWork = queuedWorkForPeer.get(target.getSourceTableId());
if (queuedWork.equals(queueKey)) {
- queuedWorkForPeer.remove(target.getSourceTableId().canonicalID());
+ queuedWorkForPeer.remove(target.getSourceTableId());
} else {
log.warn("removeQueuedWork called on {} with differing queueKeys, expected {} but was {}", target, queueKey, queuedWork);
return;
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index 26a090a..38c63f6 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -56,12 +56,12 @@ public class SequentialWorkAssignerTest {
ZooCache zooCache = createMock(ZooCache.class);
Instance inst = createMock(Instance.class);
- Map<String,Map<String,String>> queuedWork = new TreeMap<>();
- Map<String,String> cluster1Work = new TreeMap<>();
+ Map<String,Map<Table.ID,String>> queuedWork = new TreeMap<>();
+ Map<Table.ID,String> cluster1Work = new TreeMap<>();
// Two files for cluster1, one for table '1' and another for table '2' we havce assigned work for
- cluster1Work.put("1", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1"))));
- cluster1Work.put("2", DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))));
+ cluster1Work.put(Table.ID.of("1"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file1", new ReplicationTarget("cluster1", "1", Table.ID.of("1"))));
+ cluster1Work.put(Table.ID.of("2"), DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))));
queuedWork.put("cluster1", cluster1Work);
@@ -90,6 +90,6 @@ public class SequentialWorkAssignerTest {
Assert.assertEquals(1, cluster1Work.size());
Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey("file2", new ReplicationTarget("cluster1", "2", Table.ID.of("2"))),
- cluster1Work.get("2"));
+ cluster1Work.get(Table.ID.of("2")));
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
index 2a7b853..7b2055f 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/SequentialWorkAssignerIT.java
@@ -62,7 +62,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
}
@Override
- public void setQueuedWork(Map<String,Map<String,String>> queuedWork) {
+ public void setQueuedWork(Map<String,Map<Table.ID,String>> queuedWork) {
super.setQueuedWork(queuedWork);
}
@@ -137,7 +137,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
bw.close();
DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
assigner.setQueuedWork(queuedWork);
assigner.setWorkQueue(workQueue);
assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -156,10 +156,10 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
Assert.assertEquals(1, queuedWork.size());
Assert.assertTrue(queuedWork.containsKey("cluster1"));
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
Assert.assertEquals(1, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId().canonicalID()));
+ Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target), cluster1Work.get(target.getSourceTableId()));
}
@Test
@@ -199,7 +199,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
bw.close();
DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
assigner.setQueuedWork(queuedWork);
assigner.setWorkQueue(workQueue);
assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -222,13 +222,13 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
Assert.assertEquals(1, queuedWork.size());
Assert.assertTrue(queuedWork.containsKey("cluster1"));
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
Assert.assertEquals(2, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID()));
+ Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
- Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId().canonicalID()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId().canonicalID()));
+ Assert.assertTrue(cluster1Work.containsKey(target2.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster1Work.get(target2.getSourceTableId()));
}
@Test
@@ -268,7 +268,7 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
bw.close();
DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
+ Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
assigner.setQueuedWork(queuedWork);
assigner.setWorkQueue(workQueue);
assigner.setMaxQueueSize(Integer.MAX_VALUE);
@@ -291,15 +291,15 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
Assert.assertEquals(2, queuedWork.size());
Assert.assertTrue(queuedWork.containsKey("cluster1"));
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
Assert.assertEquals(1, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId().canonicalID()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId().canonicalID()));
+ Assert.assertTrue(cluster1Work.containsKey(target1.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target1), cluster1Work.get(target1.getSourceTableId()));
- Map<String,String> cluster2Work = queuedWork.get("cluster2");
+ Map<Table.ID,String> cluster2Work = queuedWork.get("cluster2");
Assert.assertEquals(1, cluster2Work.size());
- Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId().canonicalID()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId().canonicalID()));
+ Assert.assertTrue(cluster2Work.containsKey(target2.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target2), cluster2Work.get(target2.getSourceTableId()));
}
@Test
@@ -338,9 +338,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
// Treat filename1 as we have already submitted it for replication
- Map<String,Map<String,String>> queuedWork = new HashMap<>();
- Map<String,String> queuedWorkForCluster = new HashMap<>();
- queuedWorkForCluster.put(target.getSourceTableId().canonicalID(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
+ Map<String,Map<Table.ID,String>> queuedWork = new HashMap<>();
+ Map<Table.ID,String> queuedWorkForCluster = new HashMap<>();
+ queuedWorkForCluster.put(target.getSourceTableId(), DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename1, target));
queuedWork.put("cluster1", queuedWorkForCluster);
assigner.setQueuedWork(queuedWork);
@@ -361,9 +361,9 @@ public class SequentialWorkAssignerIT extends ConfigurableMacBase {
Assert.assertEquals(1, queuedWork.size());
Assert.assertTrue(queuedWork.containsKey("cluster1"));
- Map<String,String> cluster1Work = queuedWork.get("cluster1");
+ Map<Table.ID,String> cluster1Work = queuedWork.get("cluster1");
Assert.assertEquals(1, cluster1Work.size());
- Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId().canonicalID()));
- Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId().canonicalID()));
+ Assert.assertTrue(cluster1Work.containsKey(target.getSourceTableId()));
+ Assert.assertEquals(DistributedWorkQueueWorkAssignerHelper.getQueueKey(filename2, target), cluster1Work.get(target.getSourceTableId()));
}
}
--
To stop receiving notification emails like this one, please contact
"commits@accumulo.apache.org" <co...@accumulo.apache.org>.