You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/23 00:37:45 UTC

[1/3] git commit: ACCUMULO-378 Need to turn on Trace for traces to be logged.

Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 e798d5008 -> 0f6b6d0fb


ACCUMULO-378 Need to turn on Trace for traces to be logged.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4288bbe4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4288bbe4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4288bbe4

Branch: refs/heads/ACCUMULO-378
Commit: 4288bbe442394af8512cd0d801dbb98d76d10b46
Parents: e798d50
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 17:15:00 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 17:15:29 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/replication/ReplicationDriver.java   | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4288bbe4/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 3069c97..b340009 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -24,6 +24,9 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.master.Master;
+import org.apache.accumulo.trace.instrument.CountSampler;
+import org.apache.accumulo.trace.instrument.Sampler;
+import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.log4j.Logger;
 
 /**
@@ -50,6 +53,8 @@ public class ReplicationDriver extends Daemon {
 
   @Override
   public void run() {
+    Sampler sampler = new CountSampler(10);
+
     while (master.stillMaster()) {
       if (null == workMaker) {
         try {
@@ -67,6 +72,10 @@ public class ReplicationDriver extends Daemon {
         rcrr = new RemoveCompleteReplicationRecords(conn);
       }
 
+      if (sampler.next()) {
+        Trace.on("masterReplicationDriver");
+      }
+
       // Make status markers from replication records in metadata, removing entries in
       // metadata which are no longer needed (closed records)
       // This will end up creating the replication table too
@@ -83,6 +92,8 @@ public class ReplicationDriver extends Daemon {
       // So it's important that we run these sequentially and not concurrently
       rcrr.run();
 
+      Trace.offNoFlush();
+
       // Sleep for a bit
       UtilWaitThread.sleep(conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL));
     }


[2/3] git commit: ACCUMULO-378 Better logging to whom the master is connecting

Posted by el...@apache.org.
ACCUMULO-378 Better logging to whom the master is connecting


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/327b0abc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/327b0abc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/327b0abc

Branch: refs/heads/ACCUMULO-378
Commit: 327b0abc0dfcdf17422effd2f797fcfc519b6373
Parents: 4288bbe
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 18:32:53 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 18:32:53 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/client/impl/ReplicationClient.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/327b0abc/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 6e36759..02ae3d0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -83,6 +83,8 @@ public class ReplicationClient {
     String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR;
     String replCoordinatorAddr;
 
+    log.debug("Using ZooKeeper quorum at {} with path {} to find peer Master information", instance.getZooKeepers(), zkPath);
+
     // Get the coordinator port for the master we're trying to connect to
     try {
       ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
@@ -95,6 +97,8 @@ public class ReplicationClient {
     // Throw the hostname and port through HostAndPort to get some normalization
     HostAndPort coordinatorAddr = HostAndPort.fromString(replCoordinatorAddr);
 
+    log.debug("Connecting to master at {}", coordinatorAddr.toString());
+
     try {
       // Master requests can take a long time: don't ever time out
       ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(),


[3/3] git commit: ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.

Posted by el...@apache.org.
ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f6b6d0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f6b6d0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f6b6d0f

Branch: refs/heads/ACCUMULO-378
Commit: 0f6b6d0fb091440ad4780db1cf3c11f4a606de59
Parents: 327b0ab
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 18:33:11 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 18:33:11 2014 -0400

----------------------------------------------------------------------
 .../replication/ReplicationProcessor.java       | 56 ++++++++++++--------
 .../replication/ReplicationProcessorTest.java   | 49 +++++++++++++++++
 2 files changed, 84 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index d451991..ab939c5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -119,27 +119,9 @@ public class ReplicationProcessor implements Processor {
     }
 
     log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
-
-    // Replicate that sucker
-    Status replicatedStatus = replica.replicate(filePath, status, target);
-
-    log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
-
-    // If we got a different status
-    if (!replicatedStatus.equals(status)) {
-      // We actually did some work!
-      recordNewStatus(filePath, replicatedStatus, target);
-      return;
-    }
-
-    log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
-        TextFormat.shortDebugString(replicatedStatus));
-
-    // otherwise, we didn't actually replicate because there was error sending the data
-    // we can just not record any updates, and it will be picked up again by the work assigner
   }
 
-  public String getPeerType(String peerName) {
+  protected String getPeerType(String peerName) {
     // Find the configured replication peer so we know how to replicate to it
     Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
     String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
@@ -152,7 +134,7 @@ public class ReplicationProcessor implements Processor {
     return peerType;
   }
 
-  public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
+  protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
       InvalidProtocolBufferException {
     Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
     s.setRange(Range.exact(file));
@@ -161,6 +143,38 @@ public class ReplicationProcessor implements Processor {
     return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
   }
 
+  protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) {
+    Status lastStatus = status;
+    while (true) {
+      // Replicate that sucker
+      Status replicatedStatus = replica.replicate(filePath, status, target);
+  
+      log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
+  
+      // If we got a different status
+      if (!replicatedStatus.equals(lastStatus)) {
+        // We actually did some work!
+        recordNewStatus(filePath, replicatedStatus, target);
+
+        // If we don't have any more work, just quit
+        if (!StatusUtil.isWorkRequired(replicatedStatus)) {
+          return;
+        } else {
+          // Otherwise, let it loop and replicate some more data
+          lastStatus = status;
+          status = replicatedStatus;
+        }
+      } else {
+        log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
+            TextFormat.shortDebugString(replicatedStatus));
+  
+        // otherwise, we didn't actually replicate because there was error sending the data
+        // we can just not record any updates, and it will be picked up again by the work assigner      
+        return;
+      }
+    }
+  }
+
   /**
    * Record the updated Status for this file and target
    * 
@@ -171,7 +185,7 @@ public class ReplicationProcessor implements Processor {
    * @param target
    *          Peer that was replicated to
    */
-  public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
+  protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
     try {
       Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
       BatchWriter bw = ReplicationTable.getBatchWriter(conn);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index df4845c..c88e091 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -20,11 +20,15 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -65,4 +69,49 @@ public class ReplicationProcessorTest {
 
     proc.getPeerType("foo");
   }
+
+  @Test
+  public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception {
+    ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+    ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    Path path = new Path("/accumulo");
+
+    Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    
+    EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus);
+    proc.recordNewStatus(path, firstStatus, target);
+    EasyMock.expectLastCall().once();
+
+    EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus);
+    proc.recordNewStatus(path, secondStatus, target);
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(replica, proc);
+    
+    proc.replicate(replica, path, status, target);
+
+    EasyMock.verify(replica, proc);
+  }
+
+  @Test
+  public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
+    ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+    ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+    Path path = new Path("/accumulo");
+
+    EasyMock.expect(replica.replicate(path, status, target)).andReturn(status);
+
+    EasyMock.replay(replica, proc);
+    
+    proc.replicate(replica, path, status, target);
+
+    EasyMock.verify(replica, proc);
+  }
 }