You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/28 02:40:49 UTC

[09/13] git commit: ACCUMULO-378 Add tracing into the AccumuloReplicaSystem.

ACCUMULO-378 Add tracing into the AccumuloReplicaSystem.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3a619ffe
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3a619ffe
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3a619ffe

Branch: refs/heads/ACCUMULO-378
Commit: 3a619ffe08d8d90432218a3138faa572017b1f06
Parents: 9dec25e
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 18:20:31 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 18:20:31 2014 -0400

----------------------------------------------------------------------
 .../replication/AccumuloReplicaSystem.java      | 53 ++++++++++++++++++--
 1 file changed, 48 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a619ffe/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index ce44eef..4051daf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -58,6 +58,8 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.logger.LogFileKey;
@@ -150,6 +152,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
     final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 
+    Trace.on("AccumuloReplicaSystem");
+
     Instance peerInstance = getPeerInstance(target);
     // Remote identifier is an integer (table id) in this case.
     final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
@@ -159,6 +163,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
     for (int i = 0; i < numAttempts; i++) {
       String peerTserver;
+      Span span = Trace.start("Fetch peer tserver");
       try {
         // Ask the master on the remote what TServer we should talk with to replicate the data
         peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() {
@@ -173,6 +178,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
         // No progress is made
         log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e);
         continue;
+      } finally {
+        span.stop();
       }
 
       if (null == peerTserver) {
@@ -186,9 +193,19 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
       try {
         if (p.getName().endsWith(RFILE_SUFFIX)) {
-          finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          span = Trace.start("RFile replication");
+          try {
+            finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          } finally {
+            span.stop();
+          }
         } else {
-          finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          span = Trace.start("WAL replication");
+          try {
+            finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          } finally {
+            span.stop();
+          }
         }
 
         log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
@@ -202,6 +219,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
 
+    Trace.offNoFlush();
+
     // We made no status, punt on it for now, and let it re-queue itself for work
     return status;
   }
@@ -258,14 +277,20 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     final Set<Integer> tids;
     final DataInputStream input;
+    Span span = Trace.start("Read WAL header");
+    span.data("file", p.toString());
     try {
       input = getWalStream(p);
     } catch (IOException e) {
       log.error("Could not create stream for WAL", e);
       // No data sent (bytes nor records) and no progress made
       return status;
+    } finally {
+      span.stop();
     }
 
+    span = Trace.start("Consume WAL prefix");
+    span.data("file", p.toString());
     try {
       // We want to read all records in the WAL up to the "begin" offset contained in the Status message,
       // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations
@@ -273,13 +298,28 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     } catch (IOException e) {
       log.warn("Unexpected error consuming file.");
       return status;
+    } finally {
+      span.stop();
     }
 
     Status lastStatus = status, currentStatus = status;
     while (true) {
-      // Read and send a batch of mutations
-      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-          new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+      // Set some trace info
+      span = Trace.start("Replicate WAL batch");
+      span.data("Size limit", Long.toString(sizeLimit));
+      span.data("File", p.toString());
+      span.data("Peer instance name", peerInstance.getInstanceName());
+      span.data("Peer tserver", peerTserver);
+      span.data("Remote table ID", Integer.toString(remoteTableId));
+
+      ReplicationStats replResult;
+      try {
+        // Read and send a batch of mutations
+        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
+            new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+      } finally {
+        span.stop();
+      }
 
       // Catch the overflow
       long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
@@ -293,11 +333,14 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
       // If we got a different status
       if (!currentStatus.equals(lastStatus)) {
+        span = Trace.start("Update replication table");
         try {
           helper.recordNewStatus(p, currentStatus, target);
         } catch (TableNotFoundException e) {
           log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
           throw new RuntimeException("Replication table did not exist, will retry", e);
+        } finally {
+          span.stop();
         }
 
         // If we don't have any more work, just quit