You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/23 00:37:47 UTC

[3/3] git commit: ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.

ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f6b6d0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f6b6d0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f6b6d0f

Branch: refs/heads/ACCUMULO-378
Commit: 0f6b6d0fb091440ad4780db1cf3c11f4a606de59
Parents: 327b0ab
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 18:33:11 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 18:33:11 2014 -0400

----------------------------------------------------------------------
 .../replication/ReplicationProcessor.java       | 56 ++++++++++++--------
 .../replication/ReplicationProcessorTest.java   | 49 +++++++++++++++++
 2 files changed, 84 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index d451991..ab939c5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -119,27 +119,9 @@ public class ReplicationProcessor implements Processor {
     }
 
     log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
-
-    // Replicate that sucker
-    Status replicatedStatus = replica.replicate(filePath, status, target);
-
-    log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
-
-    // If we got a different status
-    if (!replicatedStatus.equals(status)) {
-      // We actually did some work!
-      recordNewStatus(filePath, replicatedStatus, target);
-      return;
-    }
-
-    log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
-        TextFormat.shortDebugString(replicatedStatus));
-
-    // otherwise, we didn't actually replicate because there was error sending the data
-    // we can just not record any updates, and it will be picked up again by the work assigner
   }
 
-  public String getPeerType(String peerName) {
+  protected String getPeerType(String peerName) {
     // Find the configured replication peer so we know how to replicate to it
     Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
     String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
@@ -152,7 +134,7 @@ public class ReplicationProcessor implements Processor {
     return peerType;
   }
 
-  public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
+  protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
       InvalidProtocolBufferException {
     Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
     s.setRange(Range.exact(file));
@@ -161,6 +143,38 @@ public class ReplicationProcessor implements Processor {
     return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
   }
 
+  protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) {
+    Status lastStatus = status;
+    while (true) {
+      // Replicate that sucker
+      Status replicatedStatus = replica.replicate(filePath, status, target);
+  
+      log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
+  
+      // If we got a different status
+      if (!replicatedStatus.equals(lastStatus)) {
+        // We actually did some work!
+        recordNewStatus(filePath, replicatedStatus, target);
+
+        // If we don't have any more work, just quit
+        if (!StatusUtil.isWorkRequired(replicatedStatus)) {
+          return;
+        } else {
+          // Otherwise, let it loop and replicate some more data
+          lastStatus = status;
+          status = replicatedStatus;
+        }
+      } else {
+        log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
+            TextFormat.shortDebugString(replicatedStatus));
+  
+        // otherwise, we didn't actually replicate because there was error sending the data
+        // we can just not record any updates, and it will be picked up again by the work assigner      
+        return;
+      }
+    }
+  }
+
   /**
    * Record the updated Status for this file and target
    * 
@@ -171,7 +185,7 @@ public class ReplicationProcessor implements Processor {
    * @param target
    *          Peer that was replicated to
    */
-  public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
+  protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
     try {
       Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
       BatchWriter bw = ReplicationTable.getBatchWriter(conn);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index df4845c..c88e091 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -20,11 +20,15 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -65,4 +69,49 @@ public class ReplicationProcessorTest {
 
     proc.getPeerType("foo");
   }
+
+  @Test
+  public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception {
+    ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+    ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    Path path = new Path("/accumulo");
+
+    Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    
+    EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus);
+    proc.recordNewStatus(path, firstStatus, target);
+    EasyMock.expectLastCall().once();
+
+    EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus);
+    proc.recordNewStatus(path, secondStatus, target);
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(replica, proc);
+    
+    proc.replicate(replica, path, status, target);
+
+    EasyMock.verify(replica, proc);
+  }
+
+  @Test
+  public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
+    ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+    ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    Path path = new Path("/accumulo");
+
+    EasyMock.expect(replica.replicate(path, status, target)).andReturn(status);
+
+    EasyMock.replay(replica, proc);
+    
+    proc.replicate(replica, path, status, target);
+
+    EasyMock.verify(replica, proc);
+  }
 }