You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/28 02:40:41 UTC

[01/13] git commit: ACCUMULO-2846 Push down the DataInputStream and immediate re-queue'ing of a file with more work to AccumuloReplicaSystem.

Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 9d9b5ed24 -> fa18d9dcf


ACCUMULO-2846 Push down the DataInputStream and immediate re-queue'ing of a file with more work to AccumuloReplicaSystem.

Handling the quick re-submission of a file with more data to replicate in the ReplicationProcessor
was a nice approach as it separated the business logic from the implementation; however, this caused
a noticable performance decrease in re-reading prefix of already replicated data from the source file.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f275353e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f275353e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f275353e

Branch: refs/heads/ACCUMULO-378
Commit: f275353ebd9052a219f9e14097c652d11810e8c9
Parents: 9d9b5ed
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 14:05:39 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 14:05:39 2014 -0400

----------------------------------------------------------------------
 .../core/client/replication/ReplicaSystem.java  |   3 +-
 .../client/replication/ReplicationTable.java    |  37 +++
 .../replication/PrintReplicationRecords.java    |   2 +-
 .../core/replication/ReplicaSystemHelper.java   |  71 ++++
 .../server/replication/ReplicationTable.java    |  35 +-
 .../master/replication/FinishedWorkUpdater.java |   6 +-
 .../replication/SequentialWorkAssigner.java     |   2 +
 .../replication/AccumuloReplicaSystem.java      | 326 ++++++++++++++-----
 .../replication/ReplicationProcessor.java       |  98 ++----
 .../replication/AccumuloReplicaSystemTest.java  |  85 ++++-
 .../replication/ReplicationProcessorTest.java   |  43 +--
 .../test/replication/MockReplicaSystem.java     |  23 +-
 .../test/replication/ReplicationTest.java       |  62 +---
 13 files changed, 519 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
index 220d7bb..e20d35f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.core.client.replication;
 
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.hadoop.fs.Path;
@@ -32,7 +33,7 @@ public interface ReplicaSystem {
    * @param target The peer
    * @return A new Status for the progress that was made
    */
-  public Status replicate(Path p, Status status, ReplicationTarget target);
+  public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper);
 
   /**
    * Configure the implementation with necessary information from the system configuration

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
index 6bd34f9..0b2b9a8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicationTable.java
@@ -16,9 +16,46 @@
  */
 package org.apache.accumulo.core.client.replication;
 
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.security.Authorizations;
+
 /**
  * 
  */
 public class ReplicationTable {
   public static final String NAME = "replication";
+
+  public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException {
+    return conn.createScanner(NAME, auths);
+  }
+
+  public static Scanner getScanner(Connector conn) throws TableNotFoundException {
+    return getScanner(conn, new Authorizations());
+  }
+
+  public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException {
+    return getBatchWriter(conn, new BatchWriterConfig());
+  }
+
+  public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException {
+    return conn.createBatchWriter(NAME, config);
+  }
+
+  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException {
+    return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
+  }
+
+  public static boolean exists(Connector conn) {
+    return exists(conn.tableOperations());
+  }
+
+  public static boolean exists(TableOperations tops) {
+    return tops.exists(NAME);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
index bb98440..5104d39 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java
@@ -77,7 +77,7 @@ public class PrintReplicationRecords implements Runnable {
     }
 
     out.println();
-    out.println(sdf.format(new Date()) + "Replication entries from replication table");
+    out.println(sdf.format(new Date()) + " Replication entries from replication table");
     out.println("--------------------------------------------------------------------");
 
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
new file mode 100644
index 0000000..660862c
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicaSystemHelper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.replication.ReplicationTable;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class ReplicaSystemHelper {
+  private static final Logger log = LoggerFactory.getLogger(ReplicaSystemHelper.class);
+
+  private Instance inst;
+  private Credentials creds;
+
+  public ReplicaSystemHelper(Instance inst, Credentials creds) {
+    this.inst = inst;
+    this.creds = creds;
+  }
+
+  /**
+   * Record the updated Status for this file and target
+   * 
+   * @param filePath
+   *          Path to file being replicated
+   * @param status
+   *          Updated Status after replication
+   * @param target
+   *          Peer that was replicated to
+   */
+  public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    try {
+      log.debug("Recording new status for {}, {}", filePath.toString(), ProtobufUtil.toString (status));
+      Mutation m = new Mutation(filePath.toString());
+      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
+      bw.addMutation(m);
+    } finally {
+      bw.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
index 68651ab..11edbb1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
@@ -25,13 +25,9 @@ import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchScanner;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.IteratorSetting.Column;
-import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
@@ -41,7 +37,6 @@ import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.StatusFormatter;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
@@ -164,7 +159,7 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
     for (Entry<String,String> property : properties) {
       if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
         if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
-          log.info("Setting formatter for {} from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME);
+          log.info("Changing formatter for {} table from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME);
           try {
             tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME);
           } catch (AccumuloException | AccumuloSecurityException e) {
@@ -193,32 +188,4 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
 
     return true;
   }
-
-  public static Scanner getScanner(Connector conn, Authorizations auths) throws TableNotFoundException {
-    return conn.createScanner(NAME, auths);
-  }
-
-  public static Scanner getScanner(Connector conn) throws TableNotFoundException {
-    return getScanner(conn, new Authorizations());
-  }
-
-  public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException {
-    return getBatchWriter(conn, new BatchWriterConfig());
-  }
-
-  public static BatchWriter getBatchWriter(Connector conn, BatchWriterConfig config) throws TableNotFoundException {
-    return conn.createBatchWriter(NAME, config);
-  }
-
-  public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException {
-    return conn.createBatchScanner(NAME, new Authorizations(), queryThreads);
-  }
-
-  public static boolean exists(Connector conn) {
-    return exists(conn.tableOperations());
-  }
-
-  public static boolean exists(TableOperations tops) {
-    return tops.exists(NAME);
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 5e0d726..3f26af9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -94,7 +94,7 @@ public class FinishedWorkUpdater implements Runnable {
           continue;
         }
 
-        log.debug("Processing work progress for {}", serializedRow.getKey().getRow());
+        log.debug("Processing work progress for {} with {} columns", serializedRow.getKey().getRow(), wholeRow.size());
 
         Map<String,Long> tableIdToProgress = new HashMap<>();
         boolean error = false;
@@ -122,7 +122,7 @@ public class FinishedWorkUpdater implements Runnable {
 
           // Find the minimum value for begin (everyone has replicated up to this offset in the file)
           tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
-        }
+        } 
 
         for (Entry<String,Long> progressByTable : tableIdToProgress.entrySet()) {
           log.debug("For {}, source table ID {} has replicated through {}", serializedRow.getKey().getRow(), progressByTable.getKey(), progressByTable.getValue());
@@ -162,6 +162,8 @@ public class FinishedWorkUpdater implements Runnable {
         }
       }
     } finally {
+      log.debug("Finished updating files with completed replication work");
+
       bs.close();
 
       try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index af43d7d..d168867 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -293,6 +293,8 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
           if (null == keyBeingReplicated) {
             // If there is no file, submit this one for replication
             newReplicationTasksSubmitted += queueWork(key, file, sourceTableId, queuedWorkForPeer);
+          } else if (keyBeingReplicated.startsWith(p.getName())) {
+            log.debug("Not re-queueing work for {} as it has already been queued fore replication to {}", file, target);
           } else {
             log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
           }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index c6b266f..ce44eef 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.client.impl.ClientExecReturn;
 import org.apache.accumulo.core.client.impl.ReplicationClient;
@@ -40,7 +41,10 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
@@ -140,7 +144,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
   }
 
   @Override
-  public Status replicate(final Path p, final Status status, final ReplicationTarget target) {
+  public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
     final Instance localInstance = HdfsZooInstance.getInstance();
     final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
     Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
@@ -152,7 +156,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     // Attempt the replication of this status a number of times before giving up and
     // trying to replicate it again later some other time.
-    for (int i = 0; i < localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS); i++) {
+    int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
+    for (int i = 0; i < numAttempts; i++) {
       String peerTserver;
       try {
         // Ask the master on the remote what TServer we should talk with to replicate the data
@@ -177,76 +182,229 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
 
       // We have a tserver on the remote -- send the data its way.
-      ReplicationStats replResult;
-      // TODO should chunk up the given file into some configurable sizes instead of just sending the entire file all at once
-      // configuration should probably just be size based.
+      Status finalStatus;
       final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
       try {
-        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-            new ClientExecReturn<ReplicationStats,ReplicationServicer.Client>() {
-              @Override
-              public ReplicationStats execute(Client client) throws Exception {
-                // RFiles have an extension, call everything else a WAL
-                if (p.getName().endsWith(RFILE_SUFFIX)) {
-                  RFileReplication kvs = getKeyValues(target, p, status, sizeLimit);
-                  if (0 < kvs.keyValues.getKeyValuesSize()) {
-                    long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tCredsForPeer);
-                    if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
-                      log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(),
-                          entriesReplicated);
-                    }
-
-                    // Not as important to track as WALs because we don't skip any KVs in an RFile
-                    return kvs;
-                  }
-                } else {
-                  WalReplication edits = getWalEdits(target, getWalStream(p), p, status, sizeLimit);
-
-                  // If we have some edits to send
-                  if (0 < edits.walEdits.getEditsSize()) {
-                    long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tCredsForPeer);
-                    if (entriesReplicated != edits.numUpdates) {
-                      log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
-                    }
-
-                    // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
-                    // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
-                    return edits;
-                  } else if (edits.entriesConsumed > 0) {
-                    // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
-                    // log entries multiple times to determine if they should be sent
-                    return edits;
-                  }
-                }
-
-                // No data sent (bytes nor records) and no progress made
-                return new ReplicationStats(0l, 0l, 0l);
-              }
-            });
-
-        log.debug("Replicated {} entries from {} to {} which is a member of the peer '{}'", replResult.sizeInRecords, p, peerTserver,
-            peerInstance.getInstanceName());
-
-        // Catch the overflow
-        long newBegin = status.getBegin() + replResult.entriesConsumed;
-        if (newBegin < 0) {
-          newBegin = Long.MAX_VALUE;
+        if (p.getName().endsWith(RFILE_SUFFIX)) {
+          finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+        } else {
+          finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
         }
 
-        // Update the begin to account for what we replicated
-        Status updatedStatus = Status.newBuilder(status).setBegin(newBegin).build();
+        log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
 
-        return updatedStatus;
+        return finalStatus;
       } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
         log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
-        UtilWaitThread.sleep(250);
+        UtilWaitThread.sleep(1000);
       }
     }
 
+    log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
+
     // We made no status, punt on it for now, and let it re-queue itself for work
     return status;
   }
 
+  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p,
+      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
+      AccumuloSecurityException {
+    DataInputStream input;
+    try {
+      input = getRFileInputStream(p);
+    } catch (IOException e) {
+      log.error("Could not create input stream from RFile, will retry", e);
+      return status;
+    }
+
+    Status lastStatus = status, currentStatus = status;
+    while (true) {
+      // Read and send a batch of mutations
+      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
+          new RFileClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds));
+
+      // Catch the overflow
+      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+      if (newBegin < 0) {
+        newBegin = Long.MAX_VALUE;
+      }
+
+      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
+
+      // If we got a different status
+      if (!currentStatus.equals(lastStatus)) {
+        // If we don't have any more work, just quit
+        if (!StatusUtil.isWorkRequired(currentStatus)) {
+          return currentStatus;
+        } else {
+          // Otherwise, let it loop and replicate some more data
+          lastStatus = currentStatus;
+        }
+      } else {
+        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
+
+        // otherwise, we didn't actually replicate (likely because there was error sending the data)
+        // we can just not record any updates, and it will be picked up again by the work assigner
+        return status;
+      }
+    }
+  }
+
+  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p,
+      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
+      AccumuloSecurityException {
+
+    final Set<Integer> tids;
+    final DataInputStream input;
+    try {
+      input = getWalStream(p);
+    } catch (IOException e) {
+      log.error("Could not create stream for WAL", e);
+      // No data sent (bytes nor records) and no progress made
+      return status;
+    }
+
+    try {
+      // We want to read all records in the WAL up to the "begin" offset contained in the Status message,
+      // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations
+      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
+    } catch (IOException e) {
+      log.warn("Unexpected error consuming file.");
+      return status;
+    }
+
+    Status lastStatus = status, currentStatus = status;
+    while (true) {
+      // Read and send a batch of mutations
+      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
+          new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+
+      // Catch the overflow
+      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
+      if (newBegin < 0) {
+        newBegin = Long.MAX_VALUE;
+      }
+
+      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
+
+      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
+
+      // If we got a different status
+      if (!currentStatus.equals(lastStatus)) {
+        try {
+          helper.recordNewStatus(p, currentStatus, target);
+        } catch (TableNotFoundException e) {
+          log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
+          throw new RuntimeException("Replication table did not exist, will retry", e);
+        }
+
+        // If we don't have any more work, just quit
+        if (!StatusUtil.isWorkRequired(currentStatus)) {
+          return currentStatus;
+        } else {
+          // Otherwise, let it loop and replicate some more data
+          lastStatus = currentStatus;
+        }
+      } else {
+        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
+
+        // otherwise, we didn't actually replicate (likely because there was error sending the data)
+        // we can just not record any updates, and it will be picked up again by the work assigner
+        return status;
+      }
+    }
+  }
+
+  protected class WalClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
+
+    private ReplicationTarget target;
+    private DataInputStream input;
+    private Path p;
+    private Status status;
+    private long sizeLimit;
+    private int remoteTableId;
+    private TCredentials tcreds;
+    private Set<Integer> tids;
+
+    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds, Set<Integer> tids) {
+      this.target = target;
+      this.input = input;
+      this.p = p;
+      this.status = status;
+      this.sizeLimit = sizeLimit;
+      this.remoteTableId = remoteTableId;
+      this.tcreds = tcreds;
+      this.tids = tids;
+    }
+
+    @Override
+    public ReplicationStats execute(Client client) throws Exception {
+      WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids);
+
+      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'",
+          (Long.MAX_VALUE == edits.entriesConsumed) ? "all" : edits.entriesConsumed, edits.sizeInBytes, p);
+
+      // If we have some edits to send
+      if (0 < edits.walEdits.getEditsSize()) {
+        long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
+        if (entriesReplicated != edits.numUpdates) {
+          log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
+        }
+
+        // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
+        // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
+        return edits;
+      } else if (edits.entriesConsumed > 0) {
+        // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
+        // log entries multiple times to determine if they should be sent
+        return edits;
+      }
+
+      // No data sent (bytes nor records) and no progress made
+      return new ReplicationStats(0l, 0l, 0l);
+    }
+  }
+
+  protected class RFileClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
+
+    private ReplicationTarget target;
+    private DataInputStream input;
+    private Path p;
+    private Status status;
+    private long sizeLimit;
+    private int remoteTableId;
+    private TCredentials tcreds;
+
+    public RFileClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds) {
+      this.target = target;
+      this.input = input;
+      this.p = p;
+      this.status = status;
+      this.sizeLimit = sizeLimit;
+      this.remoteTableId = remoteTableId;
+      this.tcreds = tcreds;
+    }
+
+    @Override
+    public ReplicationStats execute(Client client) throws Exception {
+      RFileReplication kvs = getKeyValues(target, input, p, status, sizeLimit);
+      if (0 < kvs.keyValues.getKeyValuesSize()) {
+        long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tcreds);
+        if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
+          log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(), entriesReplicated);
+        }
+
+        // Not as important to track as WALs because we don't skip any KVs in an RFile
+        return kvs;
+      }
+
+      // No data sent (bytes nor records) and no progress made
+      return new ReplicationStats(0l, 0l, 0l);
+    }
+  }
+
   protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) {
     Preconditions.checkNotNull(conf);
     Preconditions.checkNotNull(target);
@@ -269,12 +427,13 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return new ZooKeeperInstance(instanceName, zookeepers);
   }
 
-  protected RFileReplication getKeyValues(ReplicationTarget target, Path p, Status status, long sizeLimit) {
-    // TODO Implement me
+  protected RFileReplication getKeyValues(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit) {
+    // TODO ACCUMULO-2580 Implement me
     throw new UnsupportedOperationException();
   }
 
-  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
+  protected Set<Integer> consumeWalPrefix(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
+    Set<Integer> tids = new HashSet<>();
     LogFileKey key = new LogFileKey();
     LogFileValue value = new LogFileValue();
 
@@ -284,13 +443,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     // We also need to track the tids that occurred earlier in the file as mutations
     // later on might use that tid
     for (long i = 0; i < status.getBegin(); i++) {
-      try {
-        key.readFields(wal);
-        value.readFields(wal);
-      } catch (EOFException e) {
-        log.warn("Unexpectedly reached the end of file.");
-        return new WalReplication(new WalEdits(), 0, 0, 0);
-      }
+      key.readFields(wal);
+      value.readFields(wal);
 
       switch (key.event) {
         case DEFINE_TABLET:
@@ -303,20 +457,16 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
     }
 
-    WalReplication repl = getEdits(wal, sizeLimit, target, status, p, desiredTids);
-
-    log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == repl.entriesConsumed) ? "all"
-        : repl.entriesConsumed, repl.sizeInBytes, p);
-
-    return repl;
+    return tids;
   }
 
-  protected DataInputStream getWalStream(Path p) throws IOException {
+  public DataInputStream getWalStream(Path p) throws IOException {
     DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
     return streams.getDecryptingInputStream();
   }
 
-  protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p, Set<Integer> desiredTids) throws IOException {
+  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> desiredTids)
+      throws IOException {
     WalEdits edits = new WalEdits();
     edits.edits = new ArrayList<ByteBuffer>();
     long size = 0l;
@@ -413,6 +563,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return mutationsToSend;
   }
 
+  protected DataInputStream getRFileInputStream(Path p) throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
   public static class ReplicationStats {
     /**
      * The size, in bytes, of the data sent
@@ -434,6 +588,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       this.sizeInRecords = sizeInRecords;
       this.entriesConsumed = entriesConsumed;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (ReplicationStats.class.isAssignableFrom(o.getClass())) {
+        ReplicationStats other = (ReplicationStats) o;
+        return sizeInBytes == other.sizeInBytes && sizeInRecords == other.sizeInRecords && entriesConsumed == other.entriesConsumed;
+      }
+      return false;
+    }
   }
 
   public static class RFileReplication extends ReplicationStats {
@@ -467,5 +630,16 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       this.walEdits = edits;
       this.numUpdates = numMutations;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof WalReplication) {
+        WalReplication other = (WalReplication) o;
+
+        return super.equals(other) && walEdits.equals(other.walEdits) && numUpdates == other.numUpdates;
+      }
+
+      return false;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 50c79d6..3ebcda9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -22,8 +22,6 @@ import java.util.NoSuchElementException;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -31,9 +29,9 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
@@ -50,7 +48,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterables;
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
 
 /**
  * Transmit the given data to a peer
@@ -62,12 +59,14 @@ public class ReplicationProcessor implements Processor {
   private final AccumuloConfiguration conf;
   private final VolumeManager fs;
   private final Credentials creds;
+  private final ReplicaSystemHelper helper;
 
   public ReplicationProcessor(Instance inst, AccumuloConfiguration conf, VolumeManager fs, Credentials creds) {
     this.inst = inst;
     this.conf = conf;
     this.fs = fs;
     this.creds = creds;
+    this.helper = new ReplicaSystemHelper(inst, creds);
   }
 
   @Override
@@ -82,12 +81,8 @@ public class ReplicationProcessor implements Processor {
 
     log.debug("Received replication work for {} to {}", file, target);
 
-    // Find the configured replication peer so we know how to replicate to it
-    // Classname,Configuration
-    String peerType = getPeerType(target.getPeerName());
+    ReplicaSystem replica = getReplicaSystem(target);
 
-    // Get the peer that we're replicating to
-    ReplicaSystem replica = ReplicaSystemFactory.get(peerType);
     Status status;
     try {
       status = getStatus(file, target);
@@ -113,8 +108,7 @@ public class ReplicationProcessor implements Processor {
     // Sanity check that nothing bad happened and our replication source still exists
     Path filePath = new Path(file);
     try {
-      if (!fs.exists(filePath)) {
-        log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target);
+      if (!doesFileExist(filePath, target)) {
         return;
       }
     } catch (IOException e) {
@@ -124,7 +118,20 @@ public class ReplicationProcessor implements Processor {
 
     log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
 
-    replicate(replica, filePath, status, target);
+    replica.replicate(filePath, status, target, getHelper());
+  }
+
+  protected ReplicaSystemHelper getHelper() {
+    return helper;
+  }
+
+  protected ReplicaSystem getReplicaSystem(ReplicationTarget target) {
+    // Find the configured replication peer so we know how to replicate to it
+    // Classname,Configuration
+    String peerType = getPeerType(target.getPeerName());
+
+    // Get the peer that we're replicating to
+    return ReplicaSystemFactory.get(peerType);
   }
 
   protected String getPeerType(String peerName) {
@@ -140,6 +147,15 @@ public class ReplicationProcessor implements Processor {
     return peerType;
   }
 
+  protected boolean doesFileExist(Path filePath, ReplicationTarget target) throws IOException {
+    if (!fs.exists(filePath)) {
+      log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target);
+      return false;
+    }
+
+    return true;
+  }
+
   protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
       InvalidProtocolBufferException {
     Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
@@ -148,62 +164,4 @@ public class ReplicationProcessor implements Processor {
 
     return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
   }
-
-  protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) {
-    Status lastStatus = status;
-    while (true) {
-      // Replicate that sucker
-      Status replicatedStatus = replica.replicate(filePath, status, target);
-  
-      log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
-  
-      // If we got a different status
-      if (!replicatedStatus.equals(lastStatus)) {
-        // We actually did some work!
-        recordNewStatus(filePath, replicatedStatus, target);
-
-        // If we don't have any more work, just quit
-        if (!StatusUtil.isWorkRequired(replicatedStatus)) {
-          return;
-        } else {
-          // Otherwise, let it loop and replicate some more data
-          lastStatus = status;
-          status = replicatedStatus;
-        }
-      } else {
-        log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
-            TextFormat.shortDebugString(replicatedStatus));
-  
-        // otherwise, we didn't actually replicate because there was error sending the data
-        // we can just not record any updates, and it will be picked up again by the work assigner      
-        return;
-      }
-    }
-  }
-
-  /**
-   * Record the updated Status for this file and target
-   * 
-   * @param filePath
-   *          Path to file being replicated
-   * @param status
-   *          Updated Status after replication
-   * @param target
-   *          Peer that was replicated to
-   */
-  protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
-    try {
-      Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
-      BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-      log.debug("Recording new status for {}, {}", filePath.toString(), TextFormat.shortDebugString(status));
-      Mutation m = new Mutation(filePath.toString());
-      WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
-      bw.addMutation(m);
-      bw.close();
-    } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
-      log.error("Error recording updated Status for {}", filePath, e);
-      throw new RuntimeException(e);
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 85204e3..4e2901d 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -16,16 +16,23 @@
  */
 package org.apache.accumulo.tserver.replication;
 
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -36,10 +43,15 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
+import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.tserver.logger.LogEvents;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem.ReplicationStats;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem.WalClientExecReturn;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem.WalReplication;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -147,7 +159,7 @@ public class AccumuloReplicaSystemTest {
 
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(9, repl.entriesConsumed);
@@ -254,7 +266,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -319,7 +331,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -341,7 +353,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, Long.MAX_VALUE, new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(0, repl.entriesConsumed);
@@ -401,8 +413,10 @@ public class AccumuloReplicaSystemTest {
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
 
+    HashSet<Integer> tids = new HashSet<>();
+
     // Only consume the first mutation, not the second
-    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1l, tids);
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(2, repl.entriesConsumed);
@@ -411,10 +425,9 @@ public class AccumuloReplicaSystemTest {
     Assert.assertNotEquals(0, repl.sizeInBytes);
 
     status = Status.newBuilder(status).setBegin(2).build();
-    dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
 
     // Consume the rest of the mutations
-    repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+    repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1l, tids);
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(1, repl.entriesConsumed);
@@ -422,4 +435,62 @@ public class AccumuloReplicaSystemTest {
     Assert.assertEquals(1, repl.sizeInRecords);
     Assert.assertNotEquals(0, repl.sizeInBytes);
   }
+
+  @Test
+  public void dontSendEmptyDataToPeer() throws Exception {
+    Client replClient = createMock(Client.class);
+    AccumuloReplicaSystem ars = createMock(AccumuloReplicaSystem.class);
+    WalEdits edits = new WalEdits(Collections.<ByteBuffer> emptyList());
+    WalReplication walReplication = new WalReplication(edits, 0, 0, 0);
+
+    ReplicationTarget target = new ReplicationTarget("peer", "2", "1");
+    DataInputStream input = null;
+    Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+    Status status = null;
+    long sizeLimit = Long.MAX_VALUE;
+    int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+    TCredentials tcreds = null;
+    Set<Integer> tids = new HashSet<>();
+
+    WalClientExecReturn walClientExec = ars.new WalClientExecReturn(target, input, p, status, sizeLimit, remoteTableId, tcreds, tids);
+
+    expect(ars.getWalEdits(target, input, p, status, sizeLimit, tids)).andReturn(walReplication);
+
+    replay(replClient, ars);
+
+    ReplicationStats stats = walClientExec.execute(replClient);
+
+    verify(replClient, ars);
+
+    Assert.assertEquals(new ReplicationStats(0l, 0l, 0l), stats);
+  }
+
+  @Test
+  public void consumedButNotSentDataShouldBeRecorded() throws Exception {
+    Client replClient = createMock(Client.class);
+    AccumuloReplicaSystem ars = createMock(AccumuloReplicaSystem.class);
+    WalEdits edits = new WalEdits(Collections.<ByteBuffer> emptyList());
+    WalReplication walReplication = new WalReplication(edits, 0, 5, 0);
+
+    ReplicationTarget target = new ReplicationTarget("peer", "2", "1");
+    DataInputStream input = null;
+    Path p = new Path("/accumulo/wals/tserver+port/" + UUID.randomUUID().toString());
+    Status status = null;
+    long sizeLimit = Long.MAX_VALUE;
+    int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+    TCredentials tcreds = null;
+    Set<Integer> tids = new HashSet<>();
+
+    WalClientExecReturn walClientExec = ars.new WalClientExecReturn(target, input, p, status, sizeLimit, remoteTableId, tcreds, tids);
+
+    expect(ars.getWalEdits(target, input, p, status, sizeLimit, tids)).andReturn(walReplication);
+
+    replay(replClient, ars);
+
+    ReplicationStats stats = walClientExec.execute(replClient);
+
+    verify(replClient, ars);
+
+    Assert.assertEquals(new ReplicationStats(0l, 0l, 5l), stats);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index c88e091..17d5309 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.replication;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -24,10 +25,12 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.hadoop.fs.Path;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -71,46 +74,26 @@ public class ReplicationProcessorTest {
   }
 
   @Test
-  public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception {
+  public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
     ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
-    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+    ReplicaSystemHelper helper = EasyMock.createMock(ReplicaSystemHelper.class);
+    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethods("getReplicaSystem", "doesFileExist", "getStatus", "getHelper").createMock();
 
     ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     Path path = new Path("/accumulo");
 
-    Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
-    Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
-    
-    EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus);
-    proc.recordNewStatus(path, firstStatus, target);
-    EasyMock.expectLastCall().once();
+    String queueKey = AbstractWorkAssigner.getQueueKey(path.toString(), target);
 
-    EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus);
-    proc.recordNewStatus(path, secondStatus, target);
-    EasyMock.expectLastCall().once();
+    EasyMock.expect(proc.getReplicaSystem(target)).andReturn(replica);
+    EasyMock.expect(proc.getStatus(path.toString(), target)).andReturn(status);
+    EasyMock.expect(proc.doesFileExist(path, target)).andReturn(true);
+    EasyMock.expect(proc.getHelper()).andReturn(helper);
+    EasyMock.expect(replica.replicate(path, status, target, helper)).andReturn(status);
 
     EasyMock.replay(replica, proc);
-    
-    proc.replicate(replica, path, status, target);
-
-    EasyMock.verify(replica, proc);
-  }
-
-  @Test
-  public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
-    ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
-    ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
 
-    ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
-    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
-    Path path = new Path("/accumulo");
-
-    EasyMock.expect(replica.replicate(path, status, target)).andReturn(status);
-
-    EasyMock.replay(replica, proc);
-    
-    proc.replicate(replica, path, status, target);
+    proc.process(queueKey, path.toString().getBytes(StandardCharsets.UTF_8));
 
     EasyMock.verify(replica, proc);
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
index ac44f97..91e1a4b 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -16,7 +16,12 @@
  */
 package org.apache.accumulo.test.replication;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.commons.lang.StringUtils;
@@ -24,8 +29,6 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.TextFormat;
-
 /**
  * Fake ReplicaSystem which returns that the data was fully replicated after some sleep period (in milliseconds)
  * <p>
@@ -37,7 +40,7 @@ public class MockReplicaSystem implements ReplicaSystem {
   private long sleep = 0;
 
   @Override
-  public Status replicate(Path p, Status status, ReplicationTarget target) {
+  public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper) {
     Status newStatus;
     if (status.getClosed() && status.getInfiniteEnd()) {
       Status.Builder builder = Status.newBuilder(status);
@@ -48,10 +51,11 @@ public class MockReplicaSystem implements ReplicaSystem {
       }
       newStatus = builder.build();
     } else {
-      log.info("{} with status {} is not closed and with infinite length, ignoring");
+      log.info("{} with status {} is not closed and with infinite length, ignoring", p, status);
       newStatus = status;
     }
 
+    log.debug("Sleeping for {}ms before finishing replication on {}", sleep, p);
     try {
       Thread.sleep(sleep);
     } catch (InterruptedException e) {
@@ -60,7 +64,16 @@ public class MockReplicaSystem implements ReplicaSystem {
       return status;
     }
 
-    log.info("Received {}, returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
+    log.info("For {}, received {}, returned {}", p, ProtobufUtil.toString(status), ProtobufUtil.toString(newStatus));
+    try {
+      helper.recordNewStatus(p, newStatus, target);
+    } catch (TableNotFoundException e) {
+      log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(newStatus), e);
+      return status;
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      log.error("Tried to record new status in replication table for {} as {}, but got an error", p, ProtobufUtil.toString(newStatus), e);
+      return status;
+    }
 
     return newStatus;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f275353e/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index f42c5ad..b59f8da 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -24,8 +24,8 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -148,7 +148,7 @@ public class ReplicationTest extends ConfigurableMacIT {
     int attempts = 5;
     do {
       if (!exists) {
-        UtilWaitThread.sleep(200);
+        UtilWaitThread.sleep(500);
         exists = conn.tableOperations().exists(ReplicationTable.NAME);
         attempts--;
       }
@@ -1079,41 +1079,6 @@ public class ReplicationTest extends ConfigurableMacIT {
       Assert.assertFalse(t.isAlive());
     }
 
-    // write a Long.MAX_VALUE into each repl entry
-    Scanner s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    Status finishedReplStatus = StatusUtil.replicated(Long.MAX_VALUE);
-    Set<String> filesToWatch = new HashSet<>();
-    Text buff = new Text();
-    for (Entry<Key,Value> entry : s) {
-      StatusSection.getFile(entry.getKey(), buff);
-      filesToWatch.add(buff.toString());
-      Status status = Status.parseFrom(entry.getValue().get());
-      Assert.assertFalse(status.getClosed());
-
-      // Fake that each one is fully replicated
-      Mutation m = new Mutation(entry.getKey().getRow());
-      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), new Value(finishedReplStatus.toByteArray()));
-      bw.addMutation(m);
-    }
-    bw.close();
-
-    s = ReplicationTable.getScanner(conn);
-    StatusSection.limit(s);
-    bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
-    for (Entry<Key,Value> entry : s) {
-      Status status = Status.parseFrom(entry.getValue().get());
-      Assert.assertFalse(status.getClosed());
-
-      // Fake that each one is fully replicated
-      Mutation m = new Mutation(entry.getKey().getRow());
-      m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
-          StatusUtil.fileCreatedValue(System.currentTimeMillis()));
-      bw.addMutation(m);
-    }
-    bw.close();
-
     // Kill the tserver(s) and restart them
     // to ensure that the WALs we previously observed all move to closed.
     for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
@@ -1124,7 +1089,7 @@ public class ReplicationTest extends ConfigurableMacIT {
 
     // Make sure we can read all the tables (recovery complete)
     for (String table : Arrays.asList(table1, table2, table3)) {
-      s = conn.createScanner(table, new Authorizations());
+      Scanner s = conn.createScanner(table, new Authorizations());
       for (@SuppressWarnings("unused")
       Entry<Key,Value> entry : s) {}
     }
@@ -1139,7 +1104,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       // We should either find all closed records or no records
       // After they're closed, they are candidates for deletion
       for (int i = 0; i < 10; i++) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
         s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
         Iterator<Entry<Key,Value>> iter = s.iterator();
 
@@ -1162,7 +1127,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       }
 
       if (!allClosed) {
-        s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+        Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
         s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
         for (Entry<Key,Value> entry : s) {
           log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
@@ -1173,7 +1138,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       for (int i = 0; i < 10; i++) {
         allClosed = true;
 
-        s = ReplicationTable.getScanner(conn);
+        Scanner s = ReplicationTable.getScanner(conn);
         Iterator<Entry<Key,Value>> iter = s.iterator();
 
         long recordsFound = 0l;
@@ -1197,7 +1162,7 @@ public class ReplicationTest extends ConfigurableMacIT {
       }
 
       if (!allClosed) {
-        s = ReplicationTable.getScanner(conn);
+        Scanner s = ReplicationTable.getScanner(conn);
         StatusSection.limit(s);
         for (Entry<Key,Value> entry : s) {
           log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
@@ -1226,8 +1191,8 @@ public class ReplicationTest extends ConfigurableMacIT {
     // replication shouldn't exist when we begin
     Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
 
-    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
-    thread.start();
+//    ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
+//    thread.start();
 
     try {
       // Create two tables
@@ -1242,7 +1207,7 @@ public class ReplicationTest extends ConfigurableMacIT {
           conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
           // Use the MockReplicaSystem impl and sleep for 5seconds
           conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
-              ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
+              ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "1000"));
           attempts = 0;
         } catch (Exception e) {
           attempts--;
@@ -1313,6 +1278,7 @@ public class ReplicationTest extends ConfigurableMacIT {
               case PERMISSION_DENIED:
                 // retry -- the grant didn't happen yet
                 log.warn("Sleeping because permission was denied");
+                break;
               default:
                 throw e;
             }
@@ -1416,7 +1382,7 @@ public class ReplicationTest extends ConfigurableMacIT {
         recordsFound = 0;
         for (Entry<Key,Value> entry : s) {
           recordsFound++;
-          log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
+          log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
         }
 
         if (0 == recordsFound) {
@@ -1429,8 +1395,8 @@ public class ReplicationTest extends ConfigurableMacIT {
 
       Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
     } finally {
-      thread.interrupt();
-      thread.join(5000);
+//      thread.interrupt();
+//      thread.join(5000);
     }
   }
 }


[04/13] git commit: ACCUMULO-2582 Use the DistributedWorkQueue to list the children instead of doing it by hand.

Posted by el...@apache.org.
ACCUMULO-2582 Use the DistributedWorkQueue to list the children instead of doing it by hand.

The DistributedWorkQueue also has a node for handling the locking of the children.
We don't want to use that, and the name of the node is not public information. So,
we should just use the queue instead of doing it by hand.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cd78169d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cd78169d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cd78169d

Branch: refs/heads/ACCUMULO-378
Commit: cd78169d8925bef610331f74231cd03e8cb91323
Parents: 0f95206
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 16:51:25 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 16:51:25 2014 -0400

----------------------------------------------------------------------
 .../accumulo/monitor/servlets/ReplicationServlet.java | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/cd78169d/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 45f6baf..6fc8eed 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.monitor.servlets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
@@ -49,12 +48,13 @@ import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -202,13 +202,13 @@ public class ReplicationServlet extends BasicServlet {
     replicationInProgress.addSortableColumn("Peer Identifier");
     replicationInProgress.addUnsortableColumn("Status");
 
-    String zkRoot = ZooUtil.getRoot(inst);
-    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
-
     // Read the files from the workqueue in zk
+    String zkRoot = ZooUtil.getRoot(inst);
     final String workQueuePath = zkRoot + Constants.ZREPLICATION_WORK_QUEUE;
-    List<String> queuedReplication = zreader.getChildren(workQueuePath);
-    for (String queueKey : queuedReplication) {
+
+    DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst));
+
+    for (String queueKey : workQueue.getWorkQueued()) {
       Entry<String,ReplicationTarget> entry = AbstractWorkAssigner.fromQueueKey(queueKey);
       String filename = entry.getKey();
       ReplicationTarget target = entry.getValue();


[10/13] git commit: ACCUMULO-378 Try to get the tracing working for tservers sending data

Posted by el...@apache.org.
ACCUMULO-378 Try to get the tracing working for tservers sending data


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/092e22ea
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/092e22ea
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/092e22ea

Branch: refs/heads/ACCUMULO-378
Commit: 092e22ea5509ab8961a80de682687da503bec30a
Parents: 3a619ff
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 20:11:40 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 20:11:40 2014 -0400

----------------------------------------------------------------------
 .../replication/AccumuloReplicaSystem.java      | 155 ++++++++++---------
 1 file changed, 79 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/092e22ea/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 4051daf..f3a657c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -152,82 +152,84 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
     final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 
-    Trace.on("AccumuloReplicaSystem");
-
-    Instance peerInstance = getPeerInstance(target);
-    // Remote identifier is an integer (table id) in this case.
-    final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
-
-    // Attempt the replication of this status a number of times before giving up and
-    // trying to replicate it again later some other time.
-    int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
-    for (int i = 0; i < numAttempts; i++) {
-      String peerTserver;
-      Span span = Trace.start("Fetch peer tserver");
-      try {
-        // Ask the master on the remote what TServer we should talk with to replicate the data
-        peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() {
-
-          @Override
-          public String execute(ReplicationCoordinator.Client client) throws Exception {
-            return client.getServicerAddress(remoteTableId, tCredsForPeer);
-          }
-
-        });
-      } catch (AccumuloException | AccumuloSecurityException e) {
-        // No progress is made
-        log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e);
-        continue;
-      } finally {
-        span.stop();
-      }
+    try {
+      Trace.on("AccumuloReplicaSystem");
+
+      Instance peerInstance = getPeerInstance(target);
+      // Remote identifier is an integer (table id) in this case.
+      final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
+
+      // Attempt the replication of this status a number of times before giving up and
+      // trying to replicate it again later some other time.
+      int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
+      for (int i = 0; i < numAttempts; i++) {
+        String peerTserver;
+        Span span = Trace.start("Fetch peer tserver");
+        try {
+          // Ask the master on the remote what TServer we should talk with to replicate the data
+          peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() {
+
+            @Override
+            public String execute(ReplicationCoordinator.Client client) throws Exception {
+              return client.getServicerAddress(remoteTableId, tCredsForPeer);
+            }
+
+          });
+        } catch (AccumuloException | AccumuloSecurityException e) {
+          // No progress is made
+          log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e);
+          continue;
+        } finally {
+          span.stop();
+        }
 
-      if (null == peerTserver) {
-        // Something went wrong, and we didn't get a valid tserver from the remote for some reason
-        log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target);
-        continue;
-      }
+        if (null == peerTserver) {
+          // Something went wrong, and we didn't get a valid tserver from the remote for some reason
+          log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target);
+          continue;
+        }
 
-      // We have a tserver on the remote -- send the data its way.
-      Status finalStatus;
-      final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
-      try {
-        if (p.getName().endsWith(RFILE_SUFFIX)) {
-          span = Trace.start("RFile replication");
-          try {
-            finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
-          } finally {
-            span.stop();
-          }
-        } else {
-          span = Trace.start("WAL replication");
-          try {
-            finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
-          } finally {
-            span.stop();
+        // We have a tserver on the remote -- send the data its way.
+        Status finalStatus;
+        final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
+        try {
+          if (p.getName().endsWith(RFILE_SUFFIX)) {
+            span = Trace.start("RFile replication");
+            try {
+              finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+            } finally {
+              span.stop();
+            }
+          } else {
+            span = Trace.start("WAL replication");
+            try {
+              finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+            } finally {
+              span.stop();
+            }
           }
-        }
 
-        log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
+          log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
 
-        return finalStatus;
-      } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
-        log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
-        UtilWaitThread.sleep(1000);
+          return finalStatus;
+        } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
+          log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
+          UtilWaitThread.sleep(1000);
+        }
       }
-    }
-
-    log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
 
-    Trace.offNoFlush();
+      log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
 
-    // We made no status, punt on it for now, and let it re-queue itself for work
-    return status;
+      // We made no status, punt on it for now, and let it re-queue itself for work
+      return status;
+    } finally {
+      Trace.offNoFlush();
+    }
   }
 
-  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p,
-      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
-      AccumuloSecurityException {
+  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p, final Status status,
+      final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
+      AccumuloException, AccumuloSecurityException {
     DataInputStream input;
     try {
       input = getRFileInputStream(p);
@@ -239,8 +241,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     Status lastStatus = status, currentStatus = status;
     while (true) {
       // Read and send a batch of mutations
-      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-          new RFileClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds));
+      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new RFileClientExecReturn(target, input, p,
+          currentStatus, sizeLimit, remoteTableId, tcreds));
 
       // Catch the overflow
       long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
@@ -271,9 +273,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     }
   }
 
-  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p,
-      final Status status, final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException, AccumuloException,
-      AccumuloSecurityException {
+  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p, final Status status,
+      final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
+      AccumuloException, AccumuloSecurityException {
 
     final Set<Integer> tids;
     final DataInputStream input;
@@ -315,8 +317,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       ReplicationStats replResult;
       try {
         // Read and send a batch of mutations
-        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-            new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
+            remoteTableId, tcreds, tids));
       } finally {
         span.stop();
       }
@@ -371,7 +373,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     private TCredentials tcreds;
     private Set<Integer> tids;
 
-    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds, Set<Integer> tids) {
+    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds,
+        Set<Integer> tids) {
       this.target = target;
       this.input = input;
       this.p = p;
@@ -386,8 +389,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     public ReplicationStats execute(Client client) throws Exception {
       WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids);
 
-      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'",
-          (Long.MAX_VALUE == edits.entriesConsumed) ? "all" : edits.entriesConsumed, edits.sizeInBytes, p);
+      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == edits.entriesConsumed) ? "all"
+          : edits.entriesConsumed, edits.sizeInBytes, p);
 
       // If we have some edits to send
       if (0 < edits.walEdits.getEditsSize()) {


[03/13] git commit: ACCUMULO-2582 Add a replication-in-progress section to the monitor

Posted by el...@apache.org.
ACCUMULO-2582 Add a replication-in-progress section to the monitor


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f952069
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f952069
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f952069

Branch: refs/heads/ACCUMULO-378
Commit: 0f9520690808762fd1b63bff5159ef7e071cd64d
Parents: 177eabf
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 16:29:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 16:29:44 2014 -0400

----------------------------------------------------------------------
 .../monitor/servlets/ReplicationServlet.java    | 73 ++++++++++++++++++++
 1 file changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f952069/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index b8569d1..45f6baf 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -19,16 +19,20 @@ package org.apache.accumulo.monitor.servlets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
@@ -41,17 +45,24 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * 
@@ -61,6 +72,8 @@ public class ReplicationServlet extends BasicServlet {
 
   private static final long serialVersionUID = 1L;
 
+  private ZooCache zooCache = new ZooCache();
+
   @Override
   protected String getTitle(HttpServletRequest req) {
     return "Replication Overview";
@@ -180,6 +193,66 @@ public class ReplicationServlet extends BasicServlet {
     }
 
     replicationStats.generate(req, sb);
+
+    // Make a table for the replication data in progress
+    Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication");
+    replicationInProgress.addSortableColumn("File");
+    replicationInProgress.addSortableColumn("Peer");
+    replicationInProgress.addSortableColumn("Source Table ID");
+    replicationInProgress.addSortableColumn("Peer Identifier");
+    replicationInProgress.addUnsortableColumn("Status");
+
+    String zkRoot = ZooUtil.getRoot(inst);
+    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+    // Read the files from the workqueue in zk
+    final String workQueuePath = zkRoot + Constants.ZREPLICATION_WORK_QUEUE;
+    List<String> queuedReplication = zreader.getChildren(workQueuePath);
+    for (String queueKey : queuedReplication) {
+      Entry<String,ReplicationTarget> entry = AbstractWorkAssigner.fromQueueKey(queueKey);
+      String filename = entry.getKey();
+      ReplicationTarget target = entry.getValue();
+
+      byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
+
+      if (null != data) {
+        Scanner s = ReplicationTable.getScanner(conn);
+        s.setRange(Range.exact(filename));
+        s.fetchColumn(WorkSection.NAME, target.toText());
+
+        // Fetch the work entry for this item
+        String status = "Unknown";
+        Entry<Key,Value> kv = null;
+        try {
+          kv = Iterables.getOnlyElement(s);
+        } catch (NoSuchElementException e) {
+         log.trace("Could not find status of {} replicating to {}", filename, target);
+         status = "Unknown";
+        } finally {
+          s.close();
+        }
+
+        // If we found the work entry for it, try to compute some progress
+        if (null != entry) {
+          try {
+            Status stat = Status.parseFrom(kv.getValue().get());
+            if (stat.getInfiniteEnd()) {
+              status = stat.getBegin() + "/&infin;";
+            } else {
+              status = stat.getBegin() + "/" + stat.getEnd();
+            }
+          } catch (InvalidProtocolBufferException e) {
+            log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+            status = "Unknown";
+          }
+        }
+
+        // Add a row in the table
+        replicationInProgress.addRow(filename, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
+      }
+    }
+
+    replicationInProgress.generate(req, sb);
   }
 
   protected Map<String,String> invert(Map<String,String> map) {


[12/13] git commit: ACCUMULO-2582 Get ride of the exceptions in the monitor log from trying to parse conf values we didn't want

Posted by el...@apache.org.
ACCUMULO-2582 Get ride of the exceptions in the monitor log from trying to parse conf values we didn't want


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b55f3b2a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b55f3b2a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b55f3b2a

Branch: refs/heads/ACCUMULO-378
Commit: b55f3b2a0ed306da87988d44471a346e17f2f41b
Parents: d63bb8a
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 20:15:09 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 20:15:09 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/monitor/servlets/ReplicationServlet.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b55f3b2a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 7861ba0..5e7f255 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -105,7 +105,9 @@ public class ReplicationServlet extends BasicServlet {
 
     // Get the defined peers and what ReplicaSystem impl they're using
     for (Entry<String,String> property : properties.entrySet()) {
-      if (property.getKey().startsWith(definedPeersPrefix)) {
+      String key = property.getKey();
+      // Filter out cruft that we don't want
+      if (key.startsWith(definedPeersPrefix) && !key.startsWith(Property.REPLICATION_PEER_USER.getKey()) && !key.startsWith(Property.REPLICATION_PEER_PASSWORD.getKey())) {
         String peerName = property.getKey().substring(definedPeersPrefix.length());
         ReplicaSystem replica;
         try {


[08/13] git commit: ACCUMULO-2582 Finally use the right path and add a label for non-textual data

Posted by el...@apache.org.
ACCUMULO-2582 Finally use the right path and add a label for non-textual data


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9dec25ef
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9dec25ef
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9dec25ef

Branch: refs/heads/ACCUMULO-378
Commit: 9dec25effb895b20f8de1431a8c40d4ae0814bfa
Parents: 3c58a19
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 17:30:10 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 17:30:10 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/monitor/servlets/ReplicationServlet.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9dec25ef/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 8655462..6a73892 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -220,7 +220,7 @@ public class ReplicationServlet extends BasicServlet {
       // can't find the status quickly
       String status = "Unknown";
       if (null != data) {
-        String path = new String(filename);
+        String path = new String(data);
         Scanner s = ReplicationTable.getScanner(conn);
         s.setRange(Range.exact(path));
         s.fetchColumn(WorkSection.NAME, target.toText());
@@ -244,9 +244,9 @@ public class ReplicationServlet extends BasicServlet {
               status = "Finished";
             } else {
               if (stat.getInfiniteEnd()) {
-                status = stat.getBegin() + "/&infin;";
+                status = stat.getBegin() + "/&infin; records";
               } else {
-                status = stat.getBegin() + "/" + stat.getEnd();
+                status = stat.getBegin() + "/" + stat.getEnd() + " records";
               }
             }
           } catch (InvalidProtocolBufferException e) {


[09/13] git commit: ACCUMULO-378 Add tracing into the AccumuloReplicaSystem.

Posted by el...@apache.org.
ACCUMULO-378 Add tracing into the AccumuloReplicaSystem.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3a619ffe
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3a619ffe
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3a619ffe

Branch: refs/heads/ACCUMULO-378
Commit: 3a619ffe08d8d90432218a3138faa572017b1f06
Parents: 9dec25e
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 18:20:31 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 18:20:31 2014 -0400

----------------------------------------------------------------------
 .../replication/AccumuloReplicaSystem.java      | 53 ++++++++++++++++++--
 1 file changed, 48 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3a619ffe/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index ce44eef..4051daf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -58,6 +58,8 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 import org.apache.accumulo.tserver.logger.LogFileKey;
@@ -150,6 +152,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
     final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 
+    Trace.on("AccumuloReplicaSystem");
+
     Instance peerInstance = getPeerInstance(target);
     // Remote identifier is an integer (table id) in this case.
     final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
@@ -159,6 +163,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
     for (int i = 0; i < numAttempts; i++) {
       String peerTserver;
+      Span span = Trace.start("Fetch peer tserver");
       try {
         // Ask the master on the remote what TServer we should talk with to replicate the data
         peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() {
@@ -173,6 +178,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
         // No progress is made
         log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e);
         continue;
+      } finally {
+        span.stop();
       }
 
       if (null == peerTserver) {
@@ -186,9 +193,19 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
       try {
         if (p.getName().endsWith(RFILE_SUFFIX)) {
-          finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          span = Trace.start("RFile replication");
+          try {
+            finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          } finally {
+            span.stop();
+          }
         } else {
-          finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          span = Trace.start("WAL replication");
+          try {
+            finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
+          } finally {
+            span.stop();
+          }
         }
 
         log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
@@ -202,6 +219,8 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
 
+    Trace.offNoFlush();
+
     // We made no status, punt on it for now, and let it re-queue itself for work
     return status;
   }
@@ -258,14 +277,20 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
     final Set<Integer> tids;
     final DataInputStream input;
+    Span span = Trace.start("Read WAL header");
+    span.data("file", p.toString());
     try {
       input = getWalStream(p);
     } catch (IOException e) {
       log.error("Could not create stream for WAL", e);
       // No data sent (bytes nor records) and no progress made
       return status;
+    } finally {
+      span.stop();
     }
 
+    span = Trace.start("Consume WAL prefix");
+    span.data("file", p.toString());
     try {
       // We want to read all records in the WAL up to the "begin" offset contained in the Status message,
       // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations
@@ -273,13 +298,28 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     } catch (IOException e) {
       log.warn("Unexpected error consuming file.");
       return status;
+    } finally {
+      span.stop();
     }
 
     Status lastStatus = status, currentStatus = status;
     while (true) {
-      // Read and send a batch of mutations
-      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
-          new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+      // Set some trace info
+      span = Trace.start("Replicate WAL batch");
+      span.data("Size limit", Long.toString(sizeLimit));
+      span.data("File", p.toString());
+      span.data("Peer instance name", peerInstance.getInstanceName());
+      span.data("Peer tserver", peerTserver);
+      span.data("Remote table ID", Integer.toString(remoteTableId));
+
+      ReplicationStats replResult;
+      try {
+        // Read and send a batch of mutations
+        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver,
+            new WalClientExecReturn(target, input, p, currentStatus, sizeLimit, remoteTableId, tcreds, tids));
+      } finally {
+        span.stop();
+      }
 
       // Catch the overflow
       long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
@@ -293,11 +333,14 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
       // If we got a different status
       if (!currentStatus.equals(lastStatus)) {
+        span = Trace.start("Update replication table");
         try {
           helper.recordNewStatus(p, currentStatus, target);
         } catch (TableNotFoundException e) {
           log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
           throw new RuntimeException("Replication table did not exist, will retry", e);
+        } finally {
+          span.stop();
         }
 
         // If we don't have any more work, just quit


[07/13] git commit: ACCUMULO-2582 Use the full path, not just the file name.

Posted by el...@apache.org.
ACCUMULO-2582 Use the full path, not just the file name.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3c58a19b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3c58a19b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3c58a19b

Branch: refs/heads/ACCUMULO-378
Commit: 3c58a19b2054773841d823855af7a10469d855e9
Parents: be78097
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 17:17:24 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 17:17:24 2014 -0400

----------------------------------------------------------------------
 .../monitor/servlets/ReplicationServlet.java    | 59 +++++++++++---------
 1 file changed, 33 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3c58a19b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 234c7b4..8655462 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -214,38 +214,45 @@ public class ReplicationServlet extends BasicServlet {
       String filename = queueKeyPair.getKey();
       ReplicationTarget target = queueKeyPair.getValue();
 
-      Scanner s = ReplicationTable.getScanner(conn);
-      s.setRange(Range.exact(filename));
-      s.fetchColumn(WorkSection.NAME, target.toText());
+      byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
 
-      // Fetch the work entry for this item
+      // We could try to grep over the table, but without knowing the full file path, we
+      // can't find the status quickly
       String status = "Unknown";
-      Entry<Key,Value> kv = null;
-      try {
-        kv = Iterables.getOnlyElement(s);
-      } catch (NoSuchElementException e) {
-       log.trace("Could not find status of {} replicating to {}", filename, target);
-       status = "Unknown";
-      } finally {
-        s.close();
-      }
-
-      // If we found the work entry for it, try to compute some progress
-      if (null != kv) {
+      if (null != data) {
+        String path = new String(filename);
+        Scanner s = ReplicationTable.getScanner(conn);
+        s.setRange(Range.exact(path));
+        s.fetchColumn(WorkSection.NAME, target.toText());
+  
+        // Fetch the work entry for this item
+        Entry<Key,Value> kv = null;
         try {
-          Status stat = Status.parseFrom(kv.getValue().get());
-          if (StatusUtil.isFullyReplicated(stat)) {
-            status = "Finished";
-          } else {
-            if (stat.getInfiniteEnd()) {
-              status = stat.getBegin() + "/&infin;";
+          kv = Iterables.getOnlyElement(s);
+        } catch (NoSuchElementException e) {
+         log.trace("Could not find status of {} replicating to {}", filename, target);
+         status = "Unknown";
+        } finally {
+          s.close();
+        }
+  
+        // If we found the work entry for it, try to compute some progress
+        if (null != kv) {
+          try {
+            Status stat = Status.parseFrom(kv.getValue().get());
+            if (StatusUtil.isFullyReplicated(stat)) {
+              status = "Finished";
             } else {
-              status = stat.getBegin() + "/" + stat.getEnd();
+              if (stat.getInfiniteEnd()) {
+                status = stat.getBegin() + "/&infin;";
+              } else {
+                status = stat.getBegin() + "/" + stat.getEnd();
+              }
             }
+          } catch (InvalidProtocolBufferException e) {
+            log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+            status = "Unknown";
           }
-        } catch (InvalidProtocolBufferException e) {
-          log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
-          status = "Unknown";
         }
       }
 


[05/13] git commit: ACCUMULO-2582 Used the wrong variable in the conditional

Posted by el...@apache.org.
ACCUMULO-2582 Used the wrong variable in the conditional


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/80a1688d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/80a1688d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/80a1688d

Branch: refs/heads/ACCUMULO-378
Commit: 80a1688d75cb1a060b849132371ab8fa605e9243
Parents: cd78169
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 16:57:43 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 16:57:43 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/monitor/servlets/ReplicationServlet.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/80a1688d/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 6fc8eed..9c6a1ea 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -209,9 +209,9 @@ public class ReplicationServlet extends BasicServlet {
     DistributedWorkQueue workQueue = new DistributedWorkQueue(workQueuePath, ServerConfiguration.getSystemConfiguration(inst));
 
     for (String queueKey : workQueue.getWorkQueued()) {
-      Entry<String,ReplicationTarget> entry = AbstractWorkAssigner.fromQueueKey(queueKey);
-      String filename = entry.getKey();
-      ReplicationTarget target = entry.getValue();
+      Entry<String,ReplicationTarget> queueKeyPair = AbstractWorkAssigner.fromQueueKey(queueKey);
+      String filename = queueKeyPair.getKey();
+      ReplicationTarget target = queueKeyPair.getValue();
 
       byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
 
@@ -233,7 +233,7 @@ public class ReplicationServlet extends BasicServlet {
         }
 
         // If we found the work entry for it, try to compute some progress
-        if (null != entry) {
+        if (null != kv) {
           try {
             Status stat = Status.parseFrom(kv.getValue().get());
             if (stat.getInfiniteEnd()) {


[13/13] git commit: ACCUMULO-378 More consumable label on trace data

Posted by el...@apache.org.
ACCUMULO-378 More consumable label on trace data


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/fa18d9dc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/fa18d9dc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/fa18d9dc

Branch: refs/heads/ACCUMULO-378
Commit: fa18d9dcf9d6cd052c8b17403e95fc36fd347f72
Parents: b55f3b2
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 20:30:21 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 20:30:21 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/replication/AccumuloReplicaSystem.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/fa18d9dc/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index f3a657c..7233187 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -308,7 +308,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     while (true) {
       // Set some trace info
       span = Trace.start("Replicate WAL batch");
-      span.data("Size limit", Long.toString(sizeLimit));
+      span.data("Batch size (bytes)", Long.toString(sizeLimit));
       span.data("File", p.toString());
       span.data("Peer instance name", peerInstance.getInstanceName());
       span.data("Peer tserver", peerTserver);


[06/13] git commit: ACCUMULO-2582 Just use the Work status instead of data in ZK

Posted by el...@apache.org.
ACCUMULO-2582 Just use the Work status instead of data in ZK


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/be780974
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/be780974
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/be780974

Branch: refs/heads/ACCUMULO-378
Commit: be7809748aab6eb296018a5c60846df7d5dac6d1
Parents: 80a1688
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 17:09:42 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 17:09:42 2014 -0400

----------------------------------------------------------------------
 .../monitor/servlets/ReplicationServlet.java    | 55 ++++++++++----------
 1 file changed, 28 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/be780974/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 9c6a1ea..234c7b4 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
@@ -213,43 +214,43 @@ public class ReplicationServlet extends BasicServlet {
       String filename = queueKeyPair.getKey();
       ReplicationTarget target = queueKeyPair.getValue();
 
-      byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
-
-      if (null != data) {
-        Scanner s = ReplicationTable.getScanner(conn);
-        s.setRange(Range.exact(filename));
-        s.fetchColumn(WorkSection.NAME, target.toText());
+      Scanner s = ReplicationTable.getScanner(conn);
+      s.setRange(Range.exact(filename));
+      s.fetchColumn(WorkSection.NAME, target.toText());
+
+      // Fetch the work entry for this item
+      String status = "Unknown";
+      Entry<Key,Value> kv = null;
+      try {
+        kv = Iterables.getOnlyElement(s);
+      } catch (NoSuchElementException e) {
+       log.trace("Could not find status of {} replicating to {}", filename, target);
+       status = "Unknown";
+      } finally {
+        s.close();
+      }
 
-        // Fetch the work entry for this item
-        String status = "Unknown";
-        Entry<Key,Value> kv = null;
+      // If we found the work entry for it, try to compute some progress
+      if (null != kv) {
         try {
-          kv = Iterables.getOnlyElement(s);
-        } catch (NoSuchElementException e) {
-         log.trace("Could not find status of {} replicating to {}", filename, target);
-         status = "Unknown";
-        } finally {
-          s.close();
-        }
-
-        // If we found the work entry for it, try to compute some progress
-        if (null != kv) {
-          try {
-            Status stat = Status.parseFrom(kv.getValue().get());
+          Status stat = Status.parseFrom(kv.getValue().get());
+          if (StatusUtil.isFullyReplicated(stat)) {
+            status = "Finished";
+          } else {
             if (stat.getInfiniteEnd()) {
               status = stat.getBegin() + "/&infin;";
             } else {
               status = stat.getBegin() + "/" + stat.getEnd();
             }
-          } catch (InvalidProtocolBufferException e) {
-            log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
-            status = "Unknown";
           }
+        } catch (InvalidProtocolBufferException e) {
+          log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+          status = "Unknown";
         }
-
-        // Add a row in the table
-        replicationInProgress.addRow(filename, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
       }
+
+      // Add a row in the table
+      replicationInProgress.addRow(filename, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
     }
 
     replicationInProgress.generate(req, sb);


[02/13] git commit: ACCUMULO-378 Tone back logging into something a bit more useful for FinishedWorkUpdater.

Posted by el...@apache.org.
ACCUMULO-378 Tone back logging into something a bit more useful for FinishedWorkUpdater.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/177eabff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/177eabff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/177eabff

Branch: refs/heads/ACCUMULO-378
Commit: 177eabff7c4e93b0df069a2941c3053a80365b9c
Parents: f275353
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 16:29:25 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 16:29:25 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/replication/FinishedWorkUpdater.java       | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/177eabff/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
index 3f26af9..8048e02 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/FinishedWorkUpdater.java
@@ -124,10 +124,6 @@ public class FinishedWorkUpdater implements Runnable {
           tableIdToProgress.put(target.getSourceTableId(), Math.min(tableIdToProgress.get(target.getSourceTableId()), status.getBegin()));
         } 
 
-        for (Entry<String,Long> progressByTable : tableIdToProgress.entrySet()) {
-          log.debug("For {}, source table ID {} has replicated through {}", serializedRow.getKey().getRow(), progressByTable.getKey(), progressByTable.getValue());
-        }
-
         if (error) {
           continue;
         }
@@ -141,6 +137,8 @@ public class FinishedWorkUpdater implements Runnable {
 
           serializedRow.getKey().getRow(buffer);
 
+          log.debug("For {}, source table ID {} has replicated through {}", serializedRow.getKey().getRow(), entry.getKey(), entry.getValue());
+
           Mutation replMutation = new Mutation(buffer);
 
           // Set that we replicated at least this much data, ignoring the other fields


[11/13] git commit: ACCUMULO-2582 Use the full path to the file when we know it

Posted by el...@apache.org.
ACCUMULO-2582 Use the full path to the file when we know it


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d63bb8a8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d63bb8a8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d63bb8a8

Branch: refs/heads/ACCUMULO-378
Commit: d63bb8a883dae5e020f617f674edfb8319bda94a
Parents: 092e22e
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 17:48:08 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 20:12:17 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/monitor/servlets/ReplicationServlet.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d63bb8a8/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index 6a73892..7861ba0 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -219,8 +219,9 @@ public class ReplicationServlet extends BasicServlet {
       // We could try to grep over the table, but without knowing the full file path, we
       // can't find the status quickly
       String status = "Unknown";
+      String path = null;
       if (null != data) {
-        String path = new String(data);
+        path = new String(data);
         Scanner s = ReplicationTable.getScanner(conn);
         s.setRange(Range.exact(path));
         s.fetchColumn(WorkSection.NAME, target.toText());
@@ -257,7 +258,7 @@ public class ReplicationServlet extends BasicServlet {
       }
 
       // Add a row in the table
-      replicationInProgress.addRow(filename, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
+      replicationInProgress.addRow(null == path ? ".../" + filename : path, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
     }
 
     replicationInProgress.generate(req, sb);