You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/22 19:59:49 UTC

[1/5] git commit: ACCUMULO-378 Use the system configuration, not site configuration so we pull from zk too

Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 4ac04b95d -> e798d5008


ACCUMULO-378 Use the system configuration, not site configuration so we pull from zk too


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/da0a228d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/da0a228d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/da0a228d

Branch: refs/heads/ACCUMULO-378
Commit: da0a228d861952979b1dcc3bb561e2860719203a
Parents: 4ac04b9
Author: Josh Elser <el...@apache.org>
Authored: Wed May 21 16:49:43 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 21 16:49:43 2014 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/replication/AccumuloReplicaSystem.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/da0a228d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 64244c5..6cd3358 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -125,7 +125,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     instanceName = configuration.substring(0, index);
     zookeepers = configuration.substring(index + 1);
 
-    conf = ServerConfiguration.getSiteConfiguration();
+    conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
 
     try {
       fs = VolumeManagerImpl.get(conf);


[2/5] git commit: ACCUMULO-378 Lower the batchwriter "batch" size, and make it configurable.

Posted by el...@apache.org.
ACCUMULO-378 Lower the batchwriter "batch" size, and make it configurable.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3243d2ff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3243d2ff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3243d2ff

Branch: refs/heads/ACCUMULO-378
Commit: 3243d2ff9209246e7b03453460dfd4a3f231b190
Parents: da0a228
Author: Josh Elser <el...@apache.org>
Authored: Wed May 21 16:50:17 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 21 16:50:17 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/accumulo/core/conf/Property.java    |  2 ++
 .../replication/BatchWriterReplicationReplayer.java     | 12 ++++++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b1ee499..f239756 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -281,6 +281,8 @@ public enum Property {
   @Experimental
   TSERV_REPLICATION_DEFAULT_HANDLER("tserver.replication.default.replayer", "org.apache.accumulo.tserver.replication.BatchWriterReplicationReplayer",
       PropertyType.CLASSNAME, "Default AccumuloReplicationReplayer implementation"),
+  @Experimental
+  TSERV_REPLICATION_BW_REPLAYER_MEMORY("tserver.replication.batchwriter.replayer.memory", "25M", PropertyType.MEMORY, "Memory to provide to batchwriter to replay mutations for replication"),
 
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3243d2ff/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index 45c1409..ea50199 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -26,11 +26,15 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
 import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.slf4j.Logger;
@@ -45,8 +49,10 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
 
   @Override
   public long replicateLog(Connector conn, String tableName, WalEdits data) throws RemoteReplicationException {
+    final AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
     final LogFileKey key = new LogFileKey();
     final LogFileValue value = new LogFileValue();
+    final long memoryInBytes = conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY);
 
     BatchWriter bw = null;
     long mutationsApplied = 0l;
@@ -63,14 +69,16 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
 
         // Create the batchScanner if we don't already have one.
         if (null == bw) {
+          BatchWriterConfig bwConfig = new BatchWriterConfig();
+          bwConfig.setMaxMemory(memoryInBytes);
           try {
-            bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+            bw = conn.createBatchWriter(tableName, bwConfig);
           } catch (TableNotFoundException e) {
             throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table " + tableName + " does not exist");
           }
         }
 
-        log.info("Applying {} updates to table {} as part of batch", value.mutations.size(), tableName);
+        log.info("Applying {} mutations to table {} as part of batch", value.mutations.size(), tableName);
 
         try {
           bw.addMutations(value.mutations);


[5/5] git commit: ACCUMULO-2833 Add a StatusFormatter and configure it on metadata and replication tables

Posted by el...@apache.org.
ACCUMULO-2833 Add a StatusFormatter and configure it on metadata and replication tables


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e798d500
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e798d500
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e798d500

Branch: refs/heads/ACCUMULO-378
Commit: e798d50088933cbed7761b721a868d5f3da62935
Parents: 5917723
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 13:39:08 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 13:39:08 2014 -0400

----------------------------------------------------------------------
 .../accumulo/core/protobuf/ProtobufUtil.java    |   3 +-
 .../core/replication/StatusFormatter.java       | 187 +++++++++++++++++++
 .../server/replication/ReplicationTable.java    |  47 ++++-
 .../server/util/ReplicationTableUtil.java       |  48 ++++-
 .../server/util/ReplicationTableUtilTest.java   |   6 +
 .../DistributedWorkQueueWorkAssigner.java       |   2 +-
 .../replication/SequentialWorkAssigner.java     |   2 +-
 .../replication/ReplicationProcessor.java       |   6 +-
 8 files changed, 287 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java b/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java
index 30a0ac2..60eb840 100644
--- a/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/protobuf/ProtobufUtil.java
@@ -25,6 +25,7 @@ import com.google.protobuf.TextFormat;
  * Helper methods for interacting with Protocol Buffers and Accumulo
  */
 public class ProtobufUtil {
+  private static final char LEFT_BRACKET = '[', RIGHT_BRACKET = ']'; 
 
   public static Value toValue(GeneratedMessage msg) {
     return new Value(msg.toByteArray());
@@ -32,6 +33,6 @@ public class ProtobufUtil {
 
   public static String toString(GeneratedMessage msg) {
     // Too much typing
-    return TextFormat.shortDebugString(msg);
+    return LEFT_BRACKET + TextFormat.shortDebugString(msg) + RIGHT_BRACKET;
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java
new file mode 100644
index 0000000..bc04480
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusFormatter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+import java.text.DateFormat;
+import java.text.FieldPosition;
+import java.text.ParsePosition;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.OrderSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.accumulo.core.util.format.Formatter;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Parse and print the serialized protocol buffers used to track replication data
+ */
+public class StatusFormatter implements Formatter {
+  private static final Logger log = LoggerFactory.getLogger(StatusFormatter.class);
+
+  private static final Set<Text> REPLICATION_COLFAMS = Collections.unmodifiableSet(Sets.newHashSet(ReplicationSection.COLF, StatusSection.NAME,
+      WorkSection.NAME, OrderSection.NAME));
+
+  private Iterator<Entry<Key,Value>> iterator;
+  private boolean printTimestamps;
+
+  /* so a new date object doesn't get created for every record in the scan result */
+  private static ThreadLocal<Date> tmpDate = new ThreadLocal<Date>() {
+    @Override
+    protected Date initialValue() {
+      return new Date();
+    }
+  };
+
+  private static final ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
+    @Override
+    protected DateFormat initialValue() {
+      return new DefaultDateFormat();
+    }
+
+    class DefaultDateFormat extends DateFormat {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public StringBuffer format(Date date, StringBuffer toAppendTo, FieldPosition fieldPosition) {
+        toAppendTo.append(Long.toString(date.getTime()));
+        return toAppendTo;
+      }
+
+      @Override
+      public Date parse(String source, ParsePosition pos) {
+        return new Date(Long.parseLong(source));
+      }
+
+    }
+  };
+
+  @Override
+  public boolean hasNext() {
+    return iterator.hasNext();
+  }
+
+  @Override
+  public String next() {
+    Entry<Key,Value> entry = iterator.next();
+    DateFormat timestampFormat = printTimestamps ? formatter.get() : null;
+
+    // If we expected this to be a protobuf, try to parse it, adding a message when it fails to parse
+    if (REPLICATION_COLFAMS.contains(entry.getKey().getColumnFamily())) {
+      Status status;
+      try {
+        status = Status.parseFrom(entry.getValue().get());
+      } catch (InvalidProtocolBufferException e) {
+        log.trace("Could not deserialize protocol buffer for {}", entry.getKey(), e);
+        status = null;
+      }
+
+      return formatEntry(entry.getKey(), status, timestampFormat);
+    } else {
+      // Otherwise, we're set on a table that contains other data too (e.g. accumulo.metadata)
+      // Just do the normal thing
+      return DefaultFormatter.formatEntry(entry, timestampFormat);
+    }
+  }
+
+  public String formatEntry(Key key, Status status, DateFormat timestampFormat) {
+    StringBuilder sb = new StringBuilder();
+    Text buffer = new Text();
+
+    // append row
+    key.getRow(buffer);
+    appendText(sb, buffer).append(" ");
+
+    // append column family
+    key.getColumnFamily(buffer);
+    appendText(sb, buffer).append(":");
+
+    // append column qualifier
+    key.getColumnQualifier(buffer);
+    appendText(sb, buffer).append(" ");
+
+    // append visibility expression
+    key.getColumnVisibility(buffer);
+    sb.append(new ColumnVisibility(buffer));
+
+    // append timestamp
+    if (timestampFormat != null) {
+      tmpDate.get().setTime(key.getTimestamp());
+      sb.append(" ").append(timestampFormat.format(tmpDate.get()));
+    }
+
+    sb.append("\t");
+    // append value
+    if (status != null) {
+      sb.append(ProtobufUtil.toString(status));
+    } else {
+      sb.append("Could not deserialize Status protocol buffer");
+    }
+
+    return sb.toString();
+  }
+
+  protected StringBuilder appendText(StringBuilder sb, Text t) {
+    return appendBytes(sb, t.getBytes(), 0, t.getLength());
+  }
+
+  protected String getValue(Value v) {
+    StringBuilder sb = new StringBuilder();
+    return appendBytes(sb, v.get(), 0, v.get().length).toString();
+  }
+
+  protected StringBuilder appendBytes(StringBuilder sb, byte ba[], int offset, int len) {
+    for (int i = 0; i < len; i++) {
+      int c = 0xff & ba[offset + i];
+      if (c == '\\')
+        sb.append("\\\\");
+      else if (c >= 32 && c <= 126)
+        sb.append((char) c);
+      else
+        sb.append("\\x").append(String.format("%02X", c));
+    }
+    return sb;
+  }
+
+  @Override
+  public void remove() {
+    iterator.remove();
+  }
+
+  @Override
+  public void initialize(Iterable<Entry<Key,Value>> scanner, boolean printTimestamps) {
+    this.iterator = scanner.iterator();
+    this.printTimestamps = printTimestamps;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
index 622ec10..68651ab 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationTable.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.AccumuloException;
@@ -34,20 +35,23 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.iterators.Combiner;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusFormatter;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 
 public class ReplicationTable extends org.apache.accumulo.core.client.replication.ReplicationTable {
-  private static final Logger log = Logger.getLogger(ReplicationTable.class);
+  private static final Logger log = LoggerFactory.getLogger(ReplicationTable.class);
 
   public static final String COMBINER_NAME = "statuscombiner";
 
@@ -56,6 +60,7 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
   public static final String WORK_LG_NAME = WorkSection.NAME.toString();
   public static final Set<Text> WORK_LG_COLFAMS = Collections.singleton(WorkSection.NAME);
   public static final Map<String,Set<Text>> LOCALITY_GROUPS = ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS);
+  public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName();
 
   public static synchronized void create(Connector conn) {
     TableOperations tops = conn.tableOperations();
@@ -146,6 +151,44 @@ public class ReplicationTable extends org.apache.accumulo.core.client.replicatio
       }
     }
 
+    // Make sure the StatusFormatter is set on the metadata table
+    Iterable<Entry<String,String>> properties;
+    try {
+      properties = tops.getProperties(NAME);
+    } catch (AccumuloException | TableNotFoundException e) {
+      log.error("Could not fetch table properties on replication table", e);
+      return false;
+    }
+
+    boolean formatterConfigured = false;
+    for (Entry<String,String> property : properties) {
+      if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
+        if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
+          log.info("Setting formatter for {} from {} to {}", NAME, property.getValue(), STATUS_FORMATTER_CLASS_NAME);
+          try {
+            tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME);
+          } catch (AccumuloException | AccumuloSecurityException e) {
+            log.error("Could not set formatter on replication table", e);
+            return false;
+          }
+        }
+
+        formatterConfigured = true;
+
+        // Don't need to keep iterating over the properties after we found the one we were looking for
+        break;
+      }
+    }
+
+    if (!formatterConfigured) {
+      try {
+        tops.setProperty(NAME, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME);
+      } catch (AccumuloException | AccumuloSecurityException e) {
+        log.error("Could not set formatter on replication table", e);
+        return false;
+      }
+    }
+
     log.debug("Successfully configured replication table");
 
     return true;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
index 45f8fea..2a9774d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -31,6 +32,7 @@ import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.impl.Writer;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -40,6 +42,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.StatusFormatter;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -49,7 +52,8 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * provides a reference to the replication table for updates by tablet servers
@@ -57,9 +61,10 @@ import org.apache.log4j.Logger;
 public class ReplicationTableUtil {
 
   private static Map<Credentials,Writer> writers = new HashMap<Credentials,Writer>();
-  private static final Logger log = Logger.getLogger(ReplicationTableUtil.class);
+  private static final Logger log = LoggerFactory.getLogger(ReplicationTableUtil.class);
 
   public static final String COMBINER_NAME = "replcombiner";
+  public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName();
 
   private ReplicationTableUtil() {}
 
@@ -116,6 +121,37 @@ public class ReplicationTableUtil {
         throw new RuntimeException(e);
       }
     }
+
+    // Make sure the StatusFormatter is set on the metadata table
+    Iterable<Entry<String,String>> properties;
+    try {
+      properties = tops.getProperties(tableName);
+    } catch (AccumuloException | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+    for (Entry<String,String> property : properties) {
+      if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) {
+        if (!STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) {
+          log.info("Setting formatter for {} from {} to {}", tableName, property.getValue(), STATUS_FORMATTER_CLASS_NAME);
+          try {
+            tops.setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME);
+          } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        // Don't need to keep iterating over the properties after we found the one we were looking for
+        return;
+      }
+    }
+
+    // Set the formatter on the table because it wasn't already there
+    try {
+      tops.setProperty(tableName, Property.TABLE_FORMATTER_CLASS.getKey(), STATUS_FORMATTER_CLASS_NAME);
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -128,13 +164,13 @@ public class ReplicationTableUtil {
         t.update(m);
         return;
       } catch (AccumuloException e) {
-        log.error(e, e);
+        log.error(e.toString(), e);
       } catch (AccumuloSecurityException e) {
-        log.error(e, e);
+        log.error(e.toString(), e);
       } catch (ConstraintViolationException e) {
-        log.error(e, e);
+        log.error(e.toString(), e);
       } catch (TableNotFoundException e) {
-        log.error(e, e);
+        log.error(e.toString(), e);
       }
       UtilWaitThread.sleep(1000);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 12295ef..88f13c9 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -28,6 +28,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.Connector;
@@ -36,6 +37,7 @@ import org.apache.accumulo.core.client.IteratorSetting.Column;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.impl.Writer;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
@@ -142,6 +144,10 @@ public class ReplicationTableUtilTest {
     tops.attachIterator(myMetadataTable, combiner);
     expectLastCall().once();
 
+    expect(tops.getProperties(myMetadataTable)).andReturn((Iterable<Entry<String,String>>) Collections.<Entry<String,String>> emptyList());
+    tops.setProperty(myMetadataTable, Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTableUtil.STATUS_FORMATTER_CLASS_NAME);
+    expectLastCall().once();
+
     replay(conn, tops);
 
     ReplicationTableUtil.configureMetadataTable(conn, myMetadataTable);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
index f04f3e8..4f883af 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.java
@@ -255,7 +255,7 @@ public class DistributedWorkQueueWorkAssigner extends AbstractWorkAssigner {
             log.trace("Not re-queueing work for {}", key);
           }
         } else {
-          log.debug("Not queueing work for {} because [{}] doesn't need replication", file, TextFormat.shortDebugString(status));
+          log.debug("Not queueing work for {} because {} doesn't need replication", file, TextFormat.shortDebugString(status));
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index 7609ca5..f2d110a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -297,7 +297,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
             log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
           }
         } else {
-          log.debug("Not queueing work for {} because [{}] doesn't need replication", file, ProtobufUtil.toString(status));
+          log.debug("Not queueing work for {} because {} doesn't need replication", file, ProtobufUtil.toString(status));
           if (key.equals(keyBeingReplicated)) {
             log.debug("Removing {} from replication state to {} because replication is complete", keyBeingReplicated, target.getPeerName());
             queuedWorkForPeer.remove(sourceTableId);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e798d500/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 9fb7fe5..d451991 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -123,7 +123,7 @@ public class ReplicationProcessor implements Processor {
     // Replicate that sucker
     Status replicatedStatus = replica.replicate(filePath, status, target);
 
-    log.debug("Completed replication of {} to {}, with new Status [{}]", filePath, target, ProtobufUtil.toString(replicatedStatus));
+    log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
 
     // If we got a different status
     if (!replicatedStatus.equals(status)) {
@@ -132,7 +132,7 @@ public class ReplicationProcessor implements Processor {
       return;
     }
 
-    log.debug("Did not replicate any new data for {} to {}, (was [{}], now is [{}])", filePath, target, TextFormat.shortDebugString(status),
+    log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
         TextFormat.shortDebugString(replicatedStatus));
 
     // otherwise, we didn't actually replicate because there was error sending the data
@@ -175,7 +175,7 @@ public class ReplicationProcessor implements Processor {
     try {
       Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
       BatchWriter bw = ReplicationTable.getBatchWriter(conn);
-      log.debug("Recording new status for {}, [{}]", filePath.toString(), TextFormat.shortDebugString(status));
+      log.debug("Recording new status for {}, {}", filePath.toString(), TextFormat.shortDebugString(status));
       Mutation m = new Mutation(filePath.toString());
       WorkSection.add(m, target.toText(), ProtobufUtil.toValue(status));
       bw.addMutation(m);


[3/5] git commit: ACCUMULO-378 Increase the timeout as jenkins failed here writing the data

Posted by el...@apache.org.
ACCUMULO-378 Increase the timeout as jenkins failed here writing the data


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/abea3c6a
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/abea3c6a
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/abea3c6a

Branch: refs/heads/ACCUMULO-378
Commit: abea3c6afb06a7abaaba8bc92698dc25fdf12276
Parents: 3243d2f
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 13:32:34 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 13:32:34 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/test/replication/ReplicationWithGCIT.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/abea3c6a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 9047533..7d9c537 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -102,7 +102,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
     return metadataWals;
   }
 
-  @Test(timeout = 4 * 60 * 1000)
+  @Test(timeout = 6 * 60 * 1000)
   public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
     Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
     for (ProcessReference ref : gcProcs) {


[4/5] git commit: ACCUMULO-378 When re-syncing to where we left off on reads, we need to track all tids for our table

Posted by el...@apache.org.
ACCUMULO-378 When re-syncing to where we left off on reads, we need to track all tids for our table

Fixes an issue where when the DEFINE_TABLET wasn't contained in the batch
of log entries that we were reading, we ignored all of the mutations.When
we read past all of the old data, we still need to track the tids
for the table which we're replicating.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/59177233
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/59177233
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/59177233

Branch: refs/heads/ACCUMULO-378
Commit: 59177233fd903d5c69592c67e603c58bc2a0ed2a
Parents: abea3c6
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 13:37:14 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 13:37:14 2014 -0400

----------------------------------------------------------------------
 .../replication/AccumuloReplicaSystem.java      | 43 +++++++---
 .../replication/AccumuloReplicaSystemTest.java  | 82 +++++++++++++++++++-
 2 files changed, 109 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/59177233/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 6cd3358..ca1382f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -194,7 +194,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
                     return kvs;
                   }
                 } else {
-                  WalReplication edits = getWalEdits(target, p, status, sizeLimit);
+                  WalReplication edits = getWalEdits(target, getWalStream(p), p, status, sizeLimit);
 
                   // If we have some edits to send
                   if (0 < edits.walEdits.getEditsSize()) {
@@ -206,8 +206,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
                     // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
                     // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
                     return edits;
-                  } else if (edits.entriesConsumed == Long.MAX_VALUE) {
-                    // Even if we send no data, we must record the new begin value to account for the inf+ length
+                  } else if (edits.entriesConsumed > 0) {
+                    // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
+                    // log entries multiple times to determine if they should be sent
                     return edits;
                   }
                 }
@@ -249,13 +250,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     throw new UnsupportedOperationException();
   }
 
-  protected WalReplication getWalEdits(ReplicationTarget target, Path p, Status status, long sizeLimit) throws IOException {
-    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
-    DataInputStream wal = streams.getDecryptingInputStream();
+  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
     LogFileKey key = new LogFileKey();
     LogFileValue value = new LogFileValue();
 
+    Set<Integer> desiredTids = new HashSet<>();
+
     // Read through the stuff we've already processed in a previous replication attempt
+    // We also need to track the tids that occurred earlier in the file as mutations
+    // later on might use that tid
     for (long i = 0; i < status.getBegin(); i++) {
       try {
         key.readFields(wal);
@@ -264,9 +267,19 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
         log.warn("Unexpectedly reached the end of file.");
         return new WalReplication(new WalEdits(), 0, 0, 0);
       }
+
+      switch (key.event) {
+        case DEFINE_TABLET:
+          if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
+            desiredTids.add(key.tid);
+          }
+          break;
+        default:
+          break;
+      }
     }
 
-    WalReplication repl = getEdits(wal, sizeLimit, target, status, p);
+    WalReplication repl = getEdits(wal, sizeLimit, target, status, p, desiredTids);
 
     log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == repl.entriesConsumed) ? "all"
         : repl.entriesConsumed, repl.sizeInBytes, p);
@@ -274,7 +287,12 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return repl;
   }
 
-  protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p) throws IOException {
+  protected DataInputStream getWalStream(Path p) throws IOException {
+    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
+    return streams.getDecryptingInputStream();
+  }
+
+  protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p, Set<Integer> desiredTids) throws IOException {
     WalEdits edits = new WalEdits();
     edits.edits = new ArrayList<ByteBuffer>();
     long size = 0l;
@@ -283,9 +301,6 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     LogFileKey key = new LogFileKey();
     LogFileValue value = new LogFileValue();
 
-    // Any tid for our table needs to be tracked
-    Set<Integer> desiredTids = new HashSet<>();
-
     while (size < sizeLimit) {
       try {
         key.readFields(wal);
@@ -303,6 +318,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
       switch (key.event) {
         case DEFINE_TABLET:
+          // For new DEFINE_TABLETs, we also need to record the new tids we see
           if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
             desiredTids.add(key.tid);
           }
@@ -349,7 +365,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
     }
 
-    log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", value.mutations.size() - mutationsToSend, target.getPeerName());
+    int mutationsRemoved = value.mutations.size() - mutationsToSend;
+    if (mutationsRemoved > 0) {
+      log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", mutationsRemoved, target.getPeerName());
+    }
 
     out.writeInt(mutationsToSend);
     for (Mutation m : value.mutations) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/59177233/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 07d1201..85204e3 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 
@@ -146,7 +147,7 @@ public class AccumuloReplicaSystemTest {
 
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(9, repl.entriesConsumed);
@@ -253,7 +254,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -318,7 +319,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -340,7 +341,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(0, repl.entriesConsumed);
@@ -348,4 +349,77 @@ public class AccumuloReplicaSystemTest {
     Assert.assertEquals(0, repl.sizeInRecords);
     Assert.assertEquals(0, repl.sizeInBytes);
   }
+
+  @Test
+  public void restartInFileKnowsAboutPreviousTableDefines() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+
+    // What is seq used for?
+    key.seq = 1l;
+
+    /*
+     * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing whatsoever in reality regarding what these entries would actually
+     * look like in a WAL. They are solely for testing that each LogEvents is handled, order is not important.
+     */
+    key.event = LogEvents.DEFINE_TABLET;
+    key.tablet = new KeyExtent(new Text("1"), null, null);
+    key.tid = 1;
+
+    key.write(dos);
+    value.write(dos);
+
+    key.tablet = null;
+    key.event = LogEvents.MUTATION;
+    key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
+
+    key.write(dos);
+    value.write(dos);
+
+    key.tablet = null;
+    key.event = LogEvents.MUTATION;
+    key.tid = 1;
+    key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
+
+    key.write(dos);
+    value.write(dos);
+
+    dos.close();
+
+    Map<String,String> confMap = new HashMap<>();
+    confMap.put(Property.REPLICATION_NAME.getKey(), "source");
+    AccumuloConfiguration conf = new ConfigurationCopy(confMap);
+
+    AccumuloReplicaSystem ars = new AccumuloReplicaSystem();
+    ars.setConf(conf);
+
+    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+    // Only consume the first mutation, not the second
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+
+    // We stopped because we got to the end of the file
+    Assert.assertEquals(2, repl.entriesConsumed);
+    Assert.assertEquals(1, repl.walEdits.getEditsSize());
+    Assert.assertEquals(1, repl.sizeInRecords);
+    Assert.assertNotEquals(0, repl.sizeInBytes);
+
+    status = Status.newBuilder(status).setBegin(2).build();
+    dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+    // Consume the rest of the mutations
+    repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+
+    // We stopped because we got to the end of the file
+    Assert.assertEquals(1, repl.entriesConsumed);
+    Assert.assertEquals(1, repl.walEdits.getEditsSize());
+    Assert.assertEquals(1, repl.sizeInRecords);
+    Assert.assertNotEquals(0, repl.sizeInBytes);
+  }
 }