You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:48 UTC

[48/51] [abbrv] git commit: Merge remote-tracking branch 'origin/master' into ACCUMULO-378

Merge remote-tracking branch 'origin/master' into ACCUMULO-378

Conflicts:
	server/tserver/pom.xml


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f8831026
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f8831026
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f8831026

Branch: refs/heads/master
Commit: f8831026a2a0a8e4be8c0ee6fcb9b296a78d2bee
Parents: 264fad8 1392992
Author: Josh Elser <el...@apache.org>
Authored: Fri Jun 13 14:50:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jun 13 14:50:44 2014 -0400

----------------------------------------------------------------------
 bin/config.sh                                   |   6 +-
 bin/start-server.sh                             |   6 +-
 .../org/apache/accumulo/core/Constants.java     |   2 +
 .../accumulo/core/bloomfilter/BloomFilter.java  |   5 +-
 .../accumulo/core/cli/BatchWriterOpts.java      |   1 +
 .../core/client/impl/TableOperationsImpl.java   |   6 +-
 .../core/client/impl/ThriftTransportPool.java   |   2 +-
 .../client/mock/MockInstanceOperations.java     |   4 +-
 .../client/mock/MockNamespaceOperations.java    |   5 +-
 .../core/client/mock/MockTableOperations.java   |   4 +-
 .../accumulo/core/conf/ConfigurationDocGen.java |   2 +-
 .../core/conf/ObservableConfiguration.java      | 109 ++++++
 .../apache/accumulo/core/file/rfile/RFile.java  |   2 +-
 .../accumulo/core/iterators/Combiner.java       |   8 +-
 .../core/iterators/user/RegExFilter.java        |   7 +-
 .../core/util/format/DefaultFormatter.java      |  16 +-
 .../core/conf/ObservableConfigurationTest.java  |  96 ++++++
 .../iterators/aggregation/NumSummationTest.java |   6 +-
 .../iterators/user/VersioningIteratorTest.java  |  10 +-
 .../main/asciidoc/chapters/administration.txt   |  49 ++-
 docs/src/main/asciidoc/chapters/clients.txt     |  10 +-
 docs/src/main/asciidoc/chapters/design.txt      |   2 +-
 .../asciidoc/chapters/table_configuration.txt   |   8 +
 .../examples/simple/dirlist/Viewer.java         |   4 +-
 .../simple/isolation/InterferenceTest.java      |   2 +-
 .../simple/mapreduce/TokenFileWordCount.java    |   5 +-
 .../examples/simple/mapreduce/WordCount.java    |   5 +-
 .../examples/simple/reservations/ARS.java       |   5 +-
 .../examples/simple/dirlist/CountTest.java      |   6 +-
 .../org/apache/accumulo/fate/AdminUtil.java     |   6 +-
 .../accumulo/fate/zookeeper/ZooCache.java       |  86 ++++-
 .../accumulo/fate/zookeeper/ZooCacheTest.java   | 345 +++++++++++++++++++
 .../client/mapred/AccumuloFileOutputFormat.java |   3 +-
 .../client/mapred/AccumuloOutputFormat.java     |   4 +-
 .../client/mapreduce/AccumuloOutputFormat.java  |   2 +-
 .../mapreduce/lib/impl/ConfiguratorBase.java    |  38 +-
 .../mapred/AccumuloFileOutputFormatTest.java    |   3 +
 .../lib/impl/ConfiguratorBaseTest.java          |   8 +
 .../apache/accumulo/cluster/AccumuloConfig.java |  11 +
 .../minicluster/MiniAccumuloConfig.java         |  14 +
 .../minicluster/MiniAccumuloRunner.java         |  24 +-
 .../impl/MiniAccumuloClusterImpl.java           |  13 +-
 .../impl/MiniAccumuloConfigImpl.java            |  20 ++
 .../impl/MiniAccumuloConfigImplTest.java        |   8 +
 .../accumulo/server/client/BulkImporter.java    |   6 +-
 .../accumulo/server/client/HdfsZooInstance.java |   8 -
 .../server/conf/NamespaceConfWatcher.java       |   2 +-
 .../server/conf/NamespaceConfiguration.java     |  35 +-
 .../accumulo/server/conf/TableConfWatcher.java  |   2 +-
 .../server/conf/TableConfiguration.java         |  36 +-
 .../accumulo/server/conf/ZooConfiguration.java  |   7 +-
 .../accumulo/server/fs/VolumeManagerImpl.java   |  14 +-
 .../master/balancer/ChaoticLoadBalancer.java    |   7 +-
 .../server/master/recovery/RecoveryPath.java    |   7 +-
 .../server/master/state/MetaDataStateStore.java |  13 +-
 .../server/master/state/TServerInstance.java    |  13 +-
 .../accumulo/server/util/ListInstances.java     |   2 +-
 .../server/util/MasterMetadataUtil.java         |   4 +-
 .../server/util/TNonblockingServerSocket.java   |   2 +-
 .../accumulo/server/util/TServerUtils.java      |   7 +-
 .../server/util/VerifyTabletAssignments.java    |   5 +-
 .../org/apache/accumulo/server/util/ZooZap.java |   6 +-
 .../security/handler/ZKAuthenticatorTest.java   |   6 +-
 .../gc/GarbageCollectWriteAheadLogs.java        |   8 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |  13 +-
 .../accumulo/monitor/servlets/LogServlet.java   |   2 +-
 server/tserver/pom.xml                          |  15 +
 .../apache/accumulo/tserver/InMemoryMap.java    |   4 +-
 .../apache/accumulo/tserver/SessionManager.java | 314 +++++++++++++++++
 .../apache/accumulo/tserver/TabletServer.java   | 344 +-----------------
 .../apache/accumulo/tserver/WriteTracker.java   |  96 ++++++
 .../apache/accumulo/tserver/log/DfsLogger.java  |  21 +-
 .../replication/AccumuloReplicaSystem.java      |   5 +
 .../replication/ReplicationProcessor.java       |   3 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  80 +++--
 .../tserver/TservConstraintEnvTest.java         |  54 +++
 .../accumulo/tserver/tablet/TabletTest.java     |  83 +++++
 .../shell/commands/DropUserCommand.java         |  44 ++-
 .../accumulo/shell/commands/MaxRowCommand.java  |   5 +-
 .../accumulo/shell/commands/ScanCommand.java    |   2 +-
 .../shell/command/DropUserCommandTest.java      |  83 +++++
 .../java/org/apache/accumulo/start/Main.java    | 156 +++++----
 .../org/apache/accumulo/test/CreateRFiles.java  |   5 +-
 .../accumulo/test/NativeMapConcurrencyTest.java |   5 +-
 .../accumulo/test/NativeMapStressTest.java      |   6 +-
 .../accumulo/test/QueryMetadataTable.java       |  11 +-
 .../continuous/ContinuousStatsCollector.java    |   6 +-
 .../test/continuous/ContinuousVerify.java       |   4 +-
 .../accumulo/test/continuous/Histogram.java     |   8 +-
 .../test/continuous/PrintScanTimeHistogram.java |  22 +-
 .../metadata/MetadataBatchScanTest.java         |   5 +-
 .../performance/scan/CollectTabletStats.java    |   6 +-
 .../apache/accumulo/test/randomwalk/Module.java |   5 +-
 .../accumulo/test/scalability/Ingest.java       |  20 +-
 .../apache/accumulo/test/scalability/Run.java   |  12 +-
 .../accumulo/test/ConditionalWriterIT.java      |   2 +-
 .../test/MasterRepairsDualAssignmentIT.java     |   5 +-
 .../org/apache/accumulo/test/ShellServerIT.java |   3 +-
 .../accumulo/test/SplitCancelsMajCIT.java       |  83 +++++
 .../functional/DeletedTablesDontFlushIT.java    |  56 +++
 .../test/functional/GarbageCollectorIT.java     |   2 +-
 .../test/functional/SplitRecoveryIT.java        |   2 +-
 .../test/replication/ReplicationTest.java       |  24 +-
 .../system/continuous/continuous-env.sh.example |  30 +-
 test/system/continuous/start-agitator.sh        |  12 +-
 test/system/scalability/run.py                  |   9 +-
 106 files changed, 2114 insertions(+), 721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 269e15e,56a0fd5..03e0737
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -313,17 -273,17 +313,19 @@@ public class GarbageCollectWriteAheadLo
      }
      return result;
    }
 -  
 -  private int removeMetadataEntries(Map<String,Path>  nameToFileMap, Map<String, Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
 -      InterruptedException {
 +
 +  protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status, Credentials creds) throws IOException,
 +      KeeperException, InterruptedException {
      int count = 0;
 -    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get());
 +    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(creds);
  
 +    // For each WAL reference in the metadata table
      while (iterator.hasNext()) {
 +      // Each metadata reference has at least one WAL file
        for (String entry : iterator.next().logSet) {
-         String uuid = new Path(entry).getName();
+         // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
+         // the last "/" will mark a UUID file name.
+         String uuid = entry.substring(entry.lastIndexOf("/") + 1);
          if (!isUUID(uuid)) {
            // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed!
            throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/tserver/pom.xml
----------------------------------------------------------------------
diff --cc server/tserver/pom.xml
index e09f348,0687790..163fccb
--- a/server/tserver/pom.xml
+++ b/server/tserver/pom.xml
@@@ -105,6 -97,35 +105,21 @@@
        <artifactId>easymock</artifactId>
        <scope>test</scope>
      </dependency>
 -         <dependency>
 -      <groupId>org.easymock</groupId>
 -      <artifactId>easymock</artifactId>
 -      <scope>test</scope>
 -    </dependency>
+     <dependency>
+       <groupId>org.powermock</groupId>
+       <artifactId>powermock-api-easymock</artifactId>
+       <scope>test</scope>
+     </dependency>
+     <dependency>
+       <groupId>org.powermock</groupId>
+       <artifactId>powermock-core</artifactId>
+       <scope>test</scope>
+     </dependency>
+     <dependency>
+       <groupId>org.powermock</groupId>
+       <artifactId>powermock-module-junit4</artifactId>
+       <scope>test</scope>
 -    </dependency><dependency>
 -      <groupId>org.slf4j</groupId>
 -      <artifactId>slf4j-api</artifactId>
 -      <scope>test</scope>
 -    </dependency>
 -    <dependency>
 -      <groupId>org.slf4j</groupId>
 -      <artifactId>slf4j-log4j12</artifactId>
 -      <scope>test</scope>
+     </dependency>
    </dependencies>
    <build>
      <pluginManagement>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 7233187,0000000..5717185
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@@ -1,691 -1,0 +1,696 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.replication;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.ZooKeeperInstance;
 +import org.apache.accumulo.core.client.impl.ClientExecReturn;
 +import org.apache.accumulo.core.client.impl.ReplicationClient;
 +import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
 +import org.apache.accumulo.core.client.replication.ReplicaSystem;
 +import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.file.rfile.RFile;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.replication.StatusUtil;
 +import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.replication.thrift.KeyValues;
 +import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 +import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
 +import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
 +import org.apache.accumulo.core.replication.thrift.WalEdits;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.core.security.thrift.TCredentials;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.fs.VolumeManagerImpl;
 +import org.apache.accumulo.trace.instrument.Span;
 +import org.apache.accumulo.trace.instrument.Trace;
 +import org.apache.accumulo.tserver.log.DfsLogger;
 +import org.apache.accumulo.tserver.log.DfsLogger.DFSLoggerInputStreams;
 +import org.apache.accumulo.tserver.logger.LogFileKey;
 +import org.apache.accumulo.tserver.logger.LogFileValue;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.thrift.transport.TTransportException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * 
 + */
 +public class AccumuloReplicaSystem implements ReplicaSystem {
 +  private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class);
 +  private static final String RFILE_SUFFIX = "." + RFile.EXTENSION;
 +
 +  private String instanceName, zookeepers;
 +  private AccumuloConfiguration conf;
 +  private VolumeManager fs;
 +
 +  protected String getInstanceName() {
 +    return instanceName;
 +  }
 +
 +  protected void setInstanceName(String instanceName) {
 +    this.instanceName = instanceName;
 +  }
 +
 +  protected String getZookeepers() {
 +    return zookeepers;
 +  }
 +
 +  protected void setZookeepers(String zookeepers) {
 +    this.zookeepers = zookeepers;
 +  }
 +
 +  protected AccumuloConfiguration getConf() {
 +    return conf;
 +  }
 +
 +  protected void setConf(AccumuloConfiguration conf) {
 +    this.conf = conf;
 +  }
 +
 +  protected VolumeManager getFs() {
 +    return fs;
 +  }
 +
 +  protected void setFs(VolumeManager fs) {
 +    this.fs = fs;
 +  }
 +
 +  /**
 +   * Generate the configuration string for this ReplicaSystem
 +   */
 +  public static String buildConfiguration(String instanceName, String zookeepers) {
 +    return instanceName + "," + zookeepers;
 +  }
 +
 +  @Override
 +  public void configure(String configuration) {
 +    Preconditions.checkNotNull(configuration);
 +
 +    // instance_name,zookeepers
 +    int index = configuration.indexOf(',');
 +    if (-1 == index) {
++      try {
++        Thread.sleep(1000);
++      } catch (InterruptedException e) {
++        Thread.currentThread().interrupt();
++      }
 +      throw new IllegalArgumentException("Expected comma in configuration string");
 +    }
 +
 +    instanceName = configuration.substring(0, index);
 +    zookeepers = configuration.substring(index + 1);
 +
 +    conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
 +
 +    try {
 +      fs = VolumeManagerImpl.get(conf);
 +    } catch (IOException e) {
 +      log.error("Could not connect to filesystem", e);
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  @Override
 +  public Status replicate(final Path p, final Status status, final ReplicationTarget target, final ReplicaSystemHelper helper) {
 +    final Instance localInstance = HdfsZooInstance.getInstance();
 +    final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
 +    Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
 +    final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
 +
 +    try {
 +      Trace.on("AccumuloReplicaSystem");
 +
 +      Instance peerInstance = getPeerInstance(target);
 +      // Remote identifier is an integer (table id) in this case.
 +      final int remoteTableId = Integer.parseInt(target.getRemoteIdentifier());
 +
 +      // Attempt the replication of this status a number of times before giving up and
 +      // trying to replicate it again later some other time.
 +      int numAttempts = localConf.getCount(Property.REPLICATION_WORK_ATTEMPTS);
 +      for (int i = 0; i < numAttempts; i++) {
 +        String peerTserver;
 +        Span span = Trace.start("Fetch peer tserver");
 +        try {
 +          // Ask the master on the remote what TServer we should talk with to replicate the data
 +          peerTserver = ReplicationClient.executeCoordinatorWithReturn(peerInstance, new ClientExecReturn<String,ReplicationCoordinator.Client>() {
 +
 +            @Override
 +            public String execute(ReplicationCoordinator.Client client) throws Exception {
 +              return client.getServicerAddress(remoteTableId, tCredsForPeer);
 +            }
 +
 +          });
 +        } catch (AccumuloException | AccumuloSecurityException e) {
 +          // No progress is made
 +          log.error("Could not connect to master at {}, cannot proceed with replication. Will retry", target, e);
 +          continue;
 +        } finally {
 +          span.stop();
 +        }
 +
 +        if (null == peerTserver) {
 +          // Something went wrong, and we didn't get a valid tserver from the remote for some reason
 +          log.warn("Did not receive tserver from master at {}, cannot proceed with replication. Will retry.", target);
 +          continue;
 +        }
 +
 +        // We have a tserver on the remote -- send the data its way.
 +        Status finalStatus;
 +        final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
 +        try {
 +          if (p.getName().endsWith(RFILE_SUFFIX)) {
 +            span = Trace.start("RFile replication");
 +            try {
 +              finalStatus = replicateRFiles(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
 +            } finally {
 +              span.stop();
 +            }
 +          } else {
 +            span = Trace.start("WAL replication");
 +            try {
 +              finalStatus = replicateLogs(peerInstance, peerTserver, target, p, status, sizeLimit, remoteTableId, tCredsForPeer, helper);
 +            } finally {
 +              span.stop();
 +            }
 +          }
 +
 +          log.debug("New status for {} after replicating to {} is {}", p, peerInstance, ProtobufUtil.toString(finalStatus));
 +
 +          return finalStatus;
 +        } catch (TTransportException | AccumuloException | AccumuloSecurityException e) {
 +          log.warn("Could not connect to remote server {}, will retry", peerTserver, e);
 +          UtilWaitThread.sleep(1000);
 +        }
 +      }
 +
 +      log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", numAttempts, p);
 +
 +      // We made no status, punt on it for now, and let it re-queue itself for work
 +      return status;
 +    } finally {
 +      Trace.offNoFlush();
 +    }
 +  }
 +
 +  protected Status replicateRFiles(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p, final Status status,
 +      final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
 +      AccumuloException, AccumuloSecurityException {
 +    DataInputStream input;
 +    try {
 +      input = getRFileInputStream(p);
 +    } catch (IOException e) {
 +      log.error("Could not create input stream from RFile, will retry", e);
 +      return status;
 +    }
 +
 +    Status lastStatus = status, currentStatus = status;
 +    while (true) {
 +      // Read and send a batch of mutations
 +      ReplicationStats replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new RFileClientExecReturn(target, input, p,
 +          currentStatus, sizeLimit, remoteTableId, tcreds));
 +
 +      // Catch the overflow
 +      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
 +      if (newBegin < 0) {
 +        newBegin = Long.MAX_VALUE;
 +      }
 +
 +      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
 +
 +      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
 +
 +      // If we got a different status
 +      if (!currentStatus.equals(lastStatus)) {
 +        // If we don't have any more work, just quit
 +        if (!StatusUtil.isWorkRequired(currentStatus)) {
 +          return currentStatus;
 +        } else {
 +          // Otherwise, let it loop and replicate some more data
 +          lastStatus = currentStatus;
 +        }
 +      } else {
 +        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
 +
 +        // otherwise, we didn't actually replicate (likely because there was error sending the data)
 +        // we can just not record any updates, and it will be picked up again by the work assigner
 +        return status;
 +      }
 +    }
 +  }
 +
 +  protected Status replicateLogs(final Instance peerInstance, final String peerTserver, final ReplicationTarget target, final Path p, final Status status,
 +      final long sizeLimit, final int remoteTableId, final TCredentials tcreds, final ReplicaSystemHelper helper) throws TTransportException,
 +      AccumuloException, AccumuloSecurityException {
 +
 +    final Set<Integer> tids;
 +    final DataInputStream input;
 +    Span span = Trace.start("Read WAL header");
 +    span.data("file", p.toString());
 +    try {
 +      input = getWalStream(p);
 +    } catch (IOException e) {
 +      log.error("Could not create stream for WAL", e);
 +      // No data sent (bytes nor records) and no progress made
 +      return status;
 +    } finally {
 +      span.stop();
 +    }
 +
 +    span = Trace.start("Consume WAL prefix");
 +    span.data("file", p.toString());
 +    try {
 +      // We want to read all records in the WAL up to the "begin" offset contained in the Status message,
 +      // building a Set of tids from DEFINE_TABLET events which correspond to table ids for future mutations
 +      tids = consumeWalPrefix(target, input, p, status, sizeLimit);
 +    } catch (IOException e) {
 +      log.warn("Unexpected error consuming file.");
 +      return status;
 +    } finally {
 +      span.stop();
 +    }
 +
 +    Status lastStatus = status, currentStatus = status;
 +    while (true) {
 +      // Set some trace info
 +      span = Trace.start("Replicate WAL batch");
 +      span.data("Batch size (bytes)", Long.toString(sizeLimit));
 +      span.data("File", p.toString());
 +      span.data("Peer instance name", peerInstance.getInstanceName());
 +      span.data("Peer tserver", peerTserver);
 +      span.data("Remote table ID", Integer.toString(remoteTableId));
 +
 +      ReplicationStats replResult;
 +      try {
 +        // Read and send a batch of mutations
 +        replResult = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new WalClientExecReturn(target, input, p, currentStatus, sizeLimit,
 +            remoteTableId, tcreds, tids));
 +      } finally {
 +        span.stop();
 +      }
 +
 +      // Catch the overflow
 +      long newBegin = currentStatus.getBegin() + replResult.entriesConsumed;
 +      if (newBegin < 0) {
 +        newBegin = Long.MAX_VALUE;
 +      }
 +
 +      currentStatus = Status.newBuilder(currentStatus).setBegin(newBegin).build();
 +
 +      log.debug("Sent batch for replication of {} to {}, with new Status {}", p, target, ProtobufUtil.toString(currentStatus));
 +
 +      // If we got a different status
 +      if (!currentStatus.equals(lastStatus)) {
 +        span = Trace.start("Update replication table");
 +        try {
 +          helper.recordNewStatus(p, currentStatus, target);
 +        } catch (TableNotFoundException e) {
 +          log.error("Tried to update status in replication table for {} as {}, but the table did not exist", p, ProtobufUtil.toString(currentStatus), e);
 +          throw new RuntimeException("Replication table did not exist, will retry", e);
 +        } finally {
 +          span.stop();
 +        }
 +
 +        // If we don't have any more work, just quit
 +        if (!StatusUtil.isWorkRequired(currentStatus)) {
 +          return currentStatus;
 +        } else {
 +          // Otherwise, let it loop and replicate some more data
 +          lastStatus = currentStatus;
 +        }
 +      } else {
 +        log.debug("Did not replicate any new data for {} to {}, (state was {})", p, target, ProtobufUtil.toString(lastStatus));
 +
 +        // otherwise, we didn't actually replicate (likely because there was error sending the data)
 +        // we can just not record any updates, and it will be picked up again by the work assigner
 +        return status;
 +      }
 +    }
 +  }
 +
 +  protected class WalClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
 +
 +    private ReplicationTarget target;
 +    private DataInputStream input;
 +    private Path p;
 +    private Status status;
 +    private long sizeLimit;
 +    private int remoteTableId;
 +    private TCredentials tcreds;
 +    private Set<Integer> tids;
 +
 +    public WalClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds,
 +        Set<Integer> tids) {
 +      this.target = target;
 +      this.input = input;
 +      this.p = p;
 +      this.status = status;
 +      this.sizeLimit = sizeLimit;
 +      this.remoteTableId = remoteTableId;
 +      this.tcreds = tcreds;
 +      this.tids = tids;
 +    }
 +
 +    @Override
 +    public ReplicationStats execute(Client client) throws Exception {
 +      WalReplication edits = getWalEdits(target, input, p, status, sizeLimit, tids);
 +
 +      log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == edits.entriesConsumed) ? "all"
 +          : edits.entriesConsumed, edits.sizeInBytes, p);
 +
 +      // If we have some edits to send
 +      if (0 < edits.walEdits.getEditsSize()) {
 +        long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tcreds);
 +        if (entriesReplicated != edits.numUpdates) {
 +          log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
 +        }
 +
 +        // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
 +        // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
 +        return edits;
 +      } else if (edits.entriesConsumed > 0) {
 +        // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
 +        // log entries multiple times to determine if they should be sent
 +        return edits;
 +      }
 +
 +      // No data sent (bytes nor records) and no progress made
 +      return new ReplicationStats(0l, 0l, 0l);
 +    }
 +  }
 +
 +  protected class RFileClientExecReturn implements ClientExecReturn<ReplicationStats,ReplicationServicer.Client> {
 +
 +    private ReplicationTarget target;
 +    private DataInputStream input;
 +    private Path p;
 +    private Status status;
 +    private long sizeLimit;
 +    private int remoteTableId;
 +    private TCredentials tcreds;
 +
 +    public RFileClientExecReturn(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit, int remoteTableId, TCredentials tcreds) {
 +      this.target = target;
 +      this.input = input;
 +      this.p = p;
 +      this.status = status;
 +      this.sizeLimit = sizeLimit;
 +      this.remoteTableId = remoteTableId;
 +      this.tcreds = tcreds;
 +    }
 +
 +    @Override
 +    public ReplicationStats execute(Client client) throws Exception {
 +      RFileReplication kvs = getKeyValues(target, input, p, status, sizeLimit);
 +      if (0 < kvs.keyValues.getKeyValuesSize()) {
 +        long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tcreds);
 +        if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
 +          log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(), entriesReplicated);
 +        }
 +
 +        // Not as important to track as WALs because we don't skip any KVs in an RFile
 +        return kvs;
 +      }
 +
 +      // No data sent (bytes nor records) and no progress made
 +      return new ReplicationStats(0l, 0l, 0l);
 +    }
 +  }
 +
 +  protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) {
 +    Preconditions.checkNotNull(conf);
 +    Preconditions.checkNotNull(target);
 +
 +    String peerName = target.getPeerName();
 +    String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName, passwordKey = Property.REPLICATION_PEER_PASSWORD.getKey() + peerName;
 +    Map<String,String> peerUsers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
 +    Map<String,String> peerPasswords = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
 +
 +    String user = peerUsers.get(userKey);
 +    String password = peerPasswords.get(passwordKey);
 +    if (null == user || null == password) {
 +      throw new IllegalArgumentException(userKey + " and " + passwordKey + " not configured, cannot replicate");
 +    }
 +
 +    return new Credentials(user, new PasswordToken(password));
 +  }
 +
 +  protected Instance getPeerInstance(ReplicationTarget target) {
 +    return new ZooKeeperInstance(instanceName, zookeepers);
 +  }
 +
 +  protected RFileReplication getKeyValues(ReplicationTarget target, DataInputStream input, Path p, Status status, long sizeLimit) {
 +    // TODO ACCUMULO-2580 Implement me
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  protected Set<Integer> consumeWalPrefix(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
 +    Set<Integer> tids = new HashSet<>();
 +    LogFileKey key = new LogFileKey();
 +    LogFileValue value = new LogFileValue();
 +
 +    Set<Integer> desiredTids = new HashSet<>();
 +
 +    // Read through the stuff we've already processed in a previous replication attempt
 +    // We also need to track the tids that occurred earlier in the file as mutations
 +    // later on might use that tid
 +    for (long i = 0; i < status.getBegin(); i++) {
 +      key.readFields(wal);
 +      value.readFields(wal);
 +
 +      switch (key.event) {
 +        case DEFINE_TABLET:
 +          if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
 +            desiredTids.add(key.tid);
 +          }
 +          break;
 +        default:
 +          break;
 +      }
 +    }
 +
 +    return tids;
 +  }
 +
 +  public DataInputStream getWalStream(Path p) throws IOException {
 +    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
 +    return streams.getDecryptingInputStream();
 +  }
 +
 +  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit, Set<Integer> desiredTids)
 +      throws IOException {
 +    WalEdits edits = new WalEdits();
 +    edits.edits = new ArrayList<ByteBuffer>();
 +    long size = 0l;
 +    long entriesConsumed = 0l;
 +    long numUpdates = 0l;
 +    LogFileKey key = new LogFileKey();
 +    LogFileValue value = new LogFileValue();
 +
 +    while (size < sizeLimit) {
 +      try {
 +        key.readFields(wal);
 +        value.readFields(wal);
 +      } catch (EOFException e) {
 +        log.debug("Caught EOFException reading {}", p);
 +        if (status.getInfiniteEnd() && status.getClosed()) {
 +          log.debug("{} is closed and has unknown length, assuming entire file has been consumed", p);
 +          entriesConsumed = Long.MAX_VALUE;
 +        }
 +        break;
 +      }
 +
 +      entriesConsumed++;
 +
 +      switch (key.event) {
 +        case DEFINE_TABLET:
 +          // For new DEFINE_TABLETs, we also need to record the new tids we see
 +          if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
 +            desiredTids.add(key.tid);
 +          }
 +          break;
 +        case MUTATION:
 +        case MANY_MUTATIONS:
 +          // Only write out mutations for tids that are for the desired tablet
 +          if (desiredTids.contains(key.tid)) {
 +            ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +            DataOutputStream out = new DataOutputStream(baos);
 +
 +            key.write(out);
 +
 +            // Only write out the mutations that don't have the given ReplicationTarget
 +            // as a replicate source (this prevents infinite replication loops: a->b, b->a, repeat)
 +            numUpdates += writeValueAvoidingReplicationCycles(out, value, target);
 +
 +            out.flush();
 +            byte[] data = baos.toByteArray();
 +            size += data.length;
 +            edits.addToEdits(ByteBuffer.wrap(data));
 +          }
 +          break;
 +        default:
 +          log.trace("Ignorning WAL entry which doesn't contain mutations");
 +          break;
 +      }
 +    }
 +
 +    return new WalReplication(edits, size, entriesConsumed, numUpdates);
 +  }
 +
 +  /**
 +   * Wrapper around {@link LogFileValue#write(java.io.DataOutput)} which does not serialize {@link Mutation}s that do not need to be replicate to the given
 +   * {@link ReplicationTarget}
 +   * 
 +   * @throws IOException
 +   */
 +  protected long writeValueAvoidingReplicationCycles(DataOutputStream out, LogFileValue value, ReplicationTarget target) throws IOException {
 +    int mutationsToSend = 0;
 +    for (Mutation m : value.mutations) {
 +      if (!m.getReplicationSources().contains(target.getPeerName())) {
 +        mutationsToSend++;
 +      }
 +    }
 +
 +    int mutationsRemoved = value.mutations.size() - mutationsToSend;
 +    if (mutationsRemoved > 0) {
 +      log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", mutationsRemoved, target.getPeerName());
 +    }
 +
 +    out.writeInt(mutationsToSend);
 +    for (Mutation m : value.mutations) {
 +      // If we haven't yet replicated to this peer
 +      if (!m.getReplicationSources().contains(target.getPeerName())) {
 +        // Add our name, and send it
 +        String name = conf.get(Property.REPLICATION_NAME);
 +        if (StringUtils.isBlank(name)) {
 +          throw new IllegalArgumentException("Local system has no replication name configured");
 +        }
 +
 +        m.addReplicationSource(name);
 +
 +        m.write(out);
 +      }
 +    }
 +
 +    return mutationsToSend;
 +  }
 +
 +  protected DataInputStream getRFileInputStream(Path p) throws IOException {
 +    throw new UnsupportedOperationException("Not yet implemented");
 +  }
 +
 +  public static class ReplicationStats {
 +    /**
 +     * The size, in bytes, of the data sent
 +     */
 +    public long sizeInBytes;
 +
 +    /**
 +     * The number of records sent
 +     */
 +    public long sizeInRecords;
 +
 +    /**
 +     * The number of entries consumed from the log (to increment {@link Status}'s begin)
 +     */
 +    public long entriesConsumed;
 +
 +    public ReplicationStats(long sizeInBytes, long sizeInRecords, long entriesConsumed) {
 +      this.sizeInBytes = sizeInBytes;
 +      this.sizeInRecords = sizeInRecords;
 +      this.entriesConsumed = entriesConsumed;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (ReplicationStats.class.isAssignableFrom(o.getClass())) {
 +        ReplicationStats other = (ReplicationStats) o;
 +        return sizeInBytes == other.sizeInBytes && sizeInRecords == other.sizeInRecords && entriesConsumed == other.entriesConsumed;
 +      }
 +      return false;
 +    }
 +  }
 +
 +  public static class RFileReplication extends ReplicationStats {
 +    /**
 +     * The data to send
 +     */
 +    public KeyValues keyValues;
 +
 +    public RFileReplication(KeyValues kvs, long size) {
 +      super(size, kvs.keyValues.size(), kvs.keyValues.size());
 +      this.keyValues = kvs;
 +    }
 +  }
 +
 +  /**
 +   * A "struct" to avoid a nested Entry. Contains the resultant information from collecting data for replication
 +   */
 +  public static class WalReplication extends ReplicationStats {
 +    /**
 +     * The data to send over the wire
 +     */
 +    public WalEdits walEdits;
 +
 +    /**
 +     * The number of updates contained in this batch
 +     */
 +    public long numUpdates;
 +
 +    public WalReplication(WalEdits edits, long size, long entriesConsumed, long numMutations) {
 +      super(size, edits.getEditsSize(), entriesConsumed);
 +      this.walEdits = edits;
 +      this.numUpdates = numMutations;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (o instanceof WalReplication) {
 +        WalReplication other = (WalReplication) o;
 +
 +        return super.equals(other) && walEdits.equals(other.walEdits) && numUpdates == other.numUpdates;
 +      }
 +
 +      return false;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 3b3df87,0000000..b8b83af
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@@ -1,179 -1,0 +1,180 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.replication;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +import java.util.NoSuchElementException;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.replication.ReplicaSystem;
 +import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicaSystemHelper;
 +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 +import org.apache.accumulo.core.replication.ReplicationTarget;
 +import org.apache.accumulo.core.replication.StatusUtil;
 +import org.apache.accumulo.core.replication.proto.Replication.Status;
 +import org.apache.accumulo.core.security.Credentials;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
 +import org.apache.accumulo.server.replication.ReplicationTable;
 +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 +import org.apache.hadoop.fs.Path;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.protobuf.InvalidProtocolBufferException;
 +
 +/**
 + * Transmit the given data to a peer
 + */
 +public class ReplicationProcessor implements Processor {
 +  private static final Logger log = LoggerFactory.getLogger(ReplicationProcessor.class);
 +
 +  private final Instance inst;
 +  private final AccumuloConfiguration conf;
 +  private final VolumeManager fs;
 +  private final Credentials creds;
 +  private final ReplicaSystemHelper helper;
 +
 +  public ReplicationProcessor(Instance inst, AccumuloConfiguration conf, VolumeManager fs, Credentials creds) {
 +    this.inst = inst;
 +    this.conf = conf;
 +    this.fs = fs;
 +    this.creds = creds;
 +    this.helper = new ReplicaSystemHelper(inst, creds);
 +  }
 +
 +  @Override
 +  public ReplicationProcessor newProcessor() {
 +    return new ReplicationProcessor(inst, new ServerConfiguration(inst).getConfiguration(), fs, creds);
 +  }
 +
 +  @Override
 +  public void process(String workID, byte[] data) {
 +    ReplicationTarget target = DistributedWorkQueueWorkAssignerHelper.fromQueueKey(workID).getValue();
 +    String file = new String(data);
 +
 +    log.debug("Received replication work for {} to {}", file, target);
 +
 +    ReplicaSystem replica;
 +    try {
 +      replica = getReplicaSystem(target);
 +    } catch (Exception e) {
 +      log.error("Could not instantiate ReplicaSystem for {}, waiting before returning the work", target, e);
 +      try {
-         Thread.sleep(10000);
++        // TODO configurable
++        Thread.sleep(5000);
 +      } catch (InterruptedException ie) {
 +        Thread.currentThread().interrupt();
 +      }
 +
 +      return;
 +    }
 +
 +    Status status;
 +    try {
 +      status = getStatus(file, target);
 +    } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) {
 +      log.error("Could not look for replication record", e);
 +      throw new IllegalStateException("Could not look for replication record", e);
 +    } catch (InvalidProtocolBufferException e) {
 +      log.error("Could not deserialize Status from Work section for {} and ", file, target);
 +      throw new RuntimeException("Could not parse Status for work record", e);
 +    } catch (NoSuchElementException e) {
 +      log.error("Assigned work for {} to {} but could not find work record", file, target);
 +      return;
 +    }
 +
 +    log.debug("Current status for {} replicating to {}: {}", file, target, ProtobufUtil.toString(status));
 +
 +    // We don't need to do anything (shouldn't have gotten this work record in the first place)
 +    if (!StatusUtil.isWorkRequired(status)) {
 +      log.info("Received work request for {} and {}, but it does not need replication. Ignoring...", file, target);
 +      return;
 +    }
 +
 +    // Sanity check that nothing bad happened and our replication source still exists
 +    Path filePath = new Path(file);
 +    try {
 +      if (!doesFileExist(filePath, target)) {
 +        return;
 +      }
 +    } catch (IOException e) {
 +      log.error("Could not determine if file exists {}", filePath, e);
 +      throw new RuntimeException(e);
 +    }
 +
 +    log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
 +
 +    replica.replicate(filePath, status, target, getHelper());
 +  }
 +
 +  protected ReplicaSystemHelper getHelper() {
 +    return helper;
 +  }
 +
 +  protected ReplicaSystem getReplicaSystem(ReplicationTarget target) {
 +    // Find the configured replication peer so we know how to replicate to it
 +    // Classname,Configuration
 +    String peerType = getPeerType(target.getPeerName());
 +
 +    // Get the peer that we're replicating to
 +    return ReplicaSystemFactory.get(peerType);
 +  }
 +
 +  protected String getPeerType(String peerName) {
 +    // Find the configured replication peer so we know how to replicate to it
 +    Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
 +    String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
 +    if (null == peerType) {
 +      String msg = "Cannot process replication for unknown peer: " + peerName;
 +      log.warn(msg);
 +      throw new IllegalArgumentException(msg);
 +    }
 +
 +    return peerType;
 +  }
 +
 +  protected boolean doesFileExist(Path filePath, ReplicationTarget target) throws IOException {
 +    if (!fs.exists(filePath)) {
 +      log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target);
 +      return false;
 +    }
 +
 +    return true;
 +  }
 +
 +  protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
 +      InvalidProtocolBufferException {
 +    Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
 +    s.setRange(Range.exact(file));
 +    s.fetchColumn(WorkSection.NAME, target.toText());
 +
 +    return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f8831026/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index e6cbf0f,7182a34..ecdcb32
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@@ -566,26 -582,9 +588,27 @@@ public class Tablet implements TabletCo
          commitSession.updateMaxCommittedTime(tabletTime.getTime());
  
          if (count[0] == 0) {
+           log.debug("No replayed mutations applied, removing unused entries for " + extent);
            MetadataTableUtil.removeUnusedWALEntries(extent, logEntries, tabletServer.getLock());
 +
 +          // Ensure that we write a record marking each WAL as requiring replication to make sure we don't abandon the data
 +          if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
 +            Status status = StatusUtil.fileClosed();
 +            for (LogEntry logEntry : logEntries) {
 +              log.debug("Writing closed status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
 +              ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
 +            }
 +          }
 +
            logEntries.clear();
 +        } else if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
 +          // The logs are about to be re-used, we need to record that they have data for this extent,
 +          // but that they may get more data
 +          Status status = StatusUtil.openWithUnknownLength();
 +          for (LogEntry logEntry : logEntries) {
 +            log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
 +            ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
 +          }
          }
  
        } catch (Throwable t) {