You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/23 00:37:47 UTC
[3/3] git commit: ACCUMULO-2834 First attempt at immediately
requeueing a file for replication when more work is needed.
ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f6b6d0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f6b6d0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f6b6d0f
Branch: refs/heads/ACCUMULO-378
Commit: 0f6b6d0fb091440ad4780db1cf3c11f4a606de59
Parents: 327b0ab
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 18:33:11 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 18:33:11 2014 -0400
----------------------------------------------------------------------
.../replication/ReplicationProcessor.java | 56 ++++++++++++--------
.../replication/ReplicationProcessorTest.java | 49 +++++++++++++++++
2 files changed, 84 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index d451991..ab939c5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -119,27 +119,9 @@ public class ReplicationProcessor implements Processor {
}
log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
-
- // Replicate that sucker
- Status replicatedStatus = replica.replicate(filePath, status, target);
-
- log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
-
- // If we got a different status
- if (!replicatedStatus.equals(status)) {
- // We actually did some work!
- recordNewStatus(filePath, replicatedStatus, target);
- return;
- }
-
- log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
- TextFormat.shortDebugString(replicatedStatus));
-
- // otherwise, we didn't actually replicate because there was error sending the data
- // we can just not record any updates, and it will be picked up again by the work assigner
}
- public String getPeerType(String peerName) {
+ protected String getPeerType(String peerName) {
// Find the configured replication peer so we know how to replicate to it
Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
@@ -152,7 +134,7 @@ public class ReplicationProcessor implements Processor {
return peerType;
}
- public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
+ protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
InvalidProtocolBufferException {
Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
s.setRange(Range.exact(file));
@@ -161,6 +143,38 @@ public class ReplicationProcessor implements Processor {
return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
}
+ protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) {
+ Status lastStatus = status;
+ while (true) {
+ // Replicate that sucker
+ Status replicatedStatus = replica.replicate(filePath, status, target);
+
+ log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
+
+ // If we got a different status
+ if (!replicatedStatus.equals(lastStatus)) {
+ // We actually did some work!
+ recordNewStatus(filePath, replicatedStatus, target);
+
+ // If we don't have any more work, just quit
+ if (!StatusUtil.isWorkRequired(replicatedStatus)) {
+ return;
+ } else {
+ // Otherwise, let it loop and replicate some more data
+ lastStatus = status;
+ status = replicatedStatus;
+ }
+ } else {
+ log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
+ TextFormat.shortDebugString(replicatedStatus));
+
+ // otherwise, we didn't actually replicate because there was error sending the data
+ // we can just not record any updates, and it will be picked up again by the work assigner
+ return;
+ }
+ }
+ }
+
/**
* Record the updated Status for this file and target
*
@@ -171,7 +185,7 @@ public class ReplicationProcessor implements Processor {
* @param target
* Peer that was replicated to
*/
- public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
+ protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
try {
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index df4845c..c88e091 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -20,11 +20,15 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.Path;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@@ -65,4 +69,49 @@ public class ReplicationProcessorTest {
proc.getPeerType("foo");
}
+
+ @Test
+ public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception {
+ ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+ ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+ ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+ Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Path path = new Path("/accumulo");
+
+ Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+
+ EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus);
+ proc.recordNewStatus(path, firstStatus, target);
+ EasyMock.expectLastCall().once();
+
+ EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus);
+ proc.recordNewStatus(path, secondStatus, target);
+ EasyMock.expectLastCall().once();
+
+ EasyMock.replay(replica, proc);
+
+ proc.replicate(replica, path, status, target);
+
+ EasyMock.verify(replica, proc);
+ }
+
+ @Test
+ public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
+ ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+ ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+ ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+ Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Path path = new Path("/accumulo");
+
+ EasyMock.expect(replica.replicate(path, status, target)).andReturn(status);
+
+ EasyMock.replay(replica, proc);
+
+ proc.replicate(replica, path, status, target);
+
+ EasyMock.verify(replica, proc);
+ }
}