You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/22 19:59:52 UTC

[4/5] git commit: ACCUMULO-378 When re-syncing to where we left off on reads, we need to track all tids for our table

ACCUMULO-378 When re-syncing to where we left off on reads, we need to track all tids for our table

Fixes an issue where when the DEFINE_TABLET wasn't contained in the batch
of log entries that we were reading, we ignored all of the mutations.When
we read past all of the old data, we still need to track the tids
for the table which we're replicating.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/59177233
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/59177233
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/59177233

Branch: refs/heads/ACCUMULO-378
Commit: 59177233fd903d5c69592c67e603c58bc2a0ed2a
Parents: abea3c6
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 13:37:14 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 13:37:14 2014 -0400

----------------------------------------------------------------------
 .../replication/AccumuloReplicaSystem.java      | 43 +++++++---
 .../replication/AccumuloReplicaSystemTest.java  | 82 +++++++++++++++++++-
 2 files changed, 109 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/59177233/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 6cd3358..ca1382f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -194,7 +194,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
                     return kvs;
                   }
                 } else {
-                  WalReplication edits = getWalEdits(target, p, status, sizeLimit);
+                  WalReplication edits = getWalEdits(target, getWalStream(p), p, status, sizeLimit);
 
                   // If we have some edits to send
                   if (0 < edits.walEdits.getEditsSize()) {
@@ -206,8 +206,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
                     // We don't have to replicate every LogEvent in the file (only Mutation LogEvents), but we
                     // want to track progress in the file relative to all LogEvents (to avoid duplicative processing/replication)
                     return edits;
-                  } else if (edits.entriesConsumed == Long.MAX_VALUE) {
-                    // Even if we send no data, we must record the new begin value to account for the inf+ length
+                  } else if (edits.entriesConsumed > 0) {
+                    // Even if we send no data, we want to record a non-zero new begin value to avoid checking the same
+                    // log entries multiple times to determine if they should be sent
                     return edits;
                   }
                 }
@@ -249,13 +250,15 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     throw new UnsupportedOperationException();
   }
 
-  protected WalReplication getWalEdits(ReplicationTarget target, Path p, Status status, long sizeLimit) throws IOException {
-    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
-    DataInputStream wal = streams.getDecryptingInputStream();
+  protected WalReplication getWalEdits(ReplicationTarget target, DataInputStream wal, Path p, Status status, long sizeLimit) throws IOException {
     LogFileKey key = new LogFileKey();
     LogFileValue value = new LogFileValue();
 
+    Set<Integer> desiredTids = new HashSet<>();
+
     // Read through the stuff we've already processed in a previous replication attempt
+    // We also need to track the tids that occurred earlier in the file as mutations
+    // later on might use that tid
     for (long i = 0; i < status.getBegin(); i++) {
       try {
         key.readFields(wal);
@@ -264,9 +267,19 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
         log.warn("Unexpectedly reached the end of file.");
         return new WalReplication(new WalEdits(), 0, 0, 0);
       }
+
+      switch (key.event) {
+        case DEFINE_TABLET:
+          if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
+            desiredTids.add(key.tid);
+          }
+          break;
+        default:
+          break;
+      }
     }
 
-    WalReplication repl = getEdits(wal, sizeLimit, target, status, p);
+    WalReplication repl = getEdits(wal, sizeLimit, target, status, p, desiredTids);
 
     log.debug("Read {} WAL entries and retained {} bytes of WAL entries for replication to peer '{}'", (Long.MAX_VALUE == repl.entriesConsumed) ? "all"
         : repl.entriesConsumed, repl.sizeInBytes, p);
@@ -274,7 +287,12 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     return repl;
   }
 
-  protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p) throws IOException {
+  protected DataInputStream getWalStream(Path p) throws IOException {
+    DFSLoggerInputStreams streams = DfsLogger.readHeaderAndReturnStream(fs, p, conf);
+    return streams.getDecryptingInputStream();
+  }
+
+  protected WalReplication getEdits(DataInputStream wal, long sizeLimit, ReplicationTarget target, Status status, Path p, Set<Integer> desiredTids) throws IOException {
     WalEdits edits = new WalEdits();
     edits.edits = new ArrayList<ByteBuffer>();
     long size = 0l;
@@ -283,9 +301,6 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
     LogFileKey key = new LogFileKey();
     LogFileValue value = new LogFileValue();
 
-    // Any tid for our table needs to be tracked
-    Set<Integer> desiredTids = new HashSet<>();
-
     while (size < sizeLimit) {
       try {
         key.readFields(wal);
@@ -303,6 +318,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
 
       switch (key.event) {
         case DEFINE_TABLET:
+          // For new DEFINE_TABLETs, we also need to record the new tids we see
           if (target.getSourceTableId().equals(key.tablet.getTableId().toString())) {
             desiredTids.add(key.tid);
           }
@@ -349,7 +365,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
       }
     }
 
-    log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", value.mutations.size() - mutationsToSend, target.getPeerName());
+    int mutationsRemoved = value.mutations.size() - mutationsToSend;
+    if (mutationsRemoved > 0) {
+      log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", mutationsRemoved, target.getPeerName());
+    }
 
     out.writeInt(mutationsToSend);
     for (Mutation m : value.mutations) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/59177233/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 07d1201..85204e3 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 
@@ -146,7 +147,7 @@ public class AccumuloReplicaSystemTest {
 
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(9, repl.entriesConsumed);
@@ -253,7 +254,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -318,7 +319,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(Long.MAX_VALUE, repl.entriesConsumed);
@@ -340,7 +341,7 @@ public class AccumuloReplicaSystemTest {
     // If it were still open, more data could be appended that we need to process
     Status status = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
     DataInputStream dis = new DataInputStream(new ByteArrayInputStream(new byte[0]));
-    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"));
+    WalReplication repl = ars.getEdits(dis, Long.MAX_VALUE, new ReplicationTarget("peer", "1", "1"), status, new Path("/accumulo/wals/tserver+port/wal"), new HashSet<Integer>());
 
     // We stopped because we got to the end of the file
     Assert.assertEquals(0, repl.entriesConsumed);
@@ -348,4 +349,77 @@ public class AccumuloReplicaSystemTest {
     Assert.assertEquals(0, repl.sizeInRecords);
     Assert.assertEquals(0, repl.sizeInBytes);
   }
+
+  @Test
+  public void restartInFileKnowsAboutPreviousTableDefines() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+
+    // What is seq used for?
+    key.seq = 1l;
+
+    /*
+     * Disclaimer: the following series of LogFileKey and LogFileValue pairs have *no* bearing whatsoever in reality regarding what these entries would actually
+     * look like in a WAL. They are solely for testing that each LogEvents is handled, order is not important.
+     */
+    key.event = LogEvents.DEFINE_TABLET;
+    key.tablet = new KeyExtent(new Text("1"), null, null);
+    key.tid = 1;
+
+    key.write(dos);
+    value.write(dos);
+
+    key.tablet = null;
+    key.event = LogEvents.MUTATION;
+    key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
+
+    key.write(dos);
+    value.write(dos);
+
+    key.tablet = null;
+    key.event = LogEvents.MUTATION;
+    key.tid = 1;
+    key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
+    value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
+
+    key.write(dos);
+    value.write(dos);
+
+    dos.close();
+
+    Map<String,String> confMap = new HashMap<>();
+    confMap.put(Property.REPLICATION_NAME.getKey(), "source");
+    AccumuloConfiguration conf = new ConfigurationCopy(confMap);
+
+    AccumuloReplicaSystem ars = new AccumuloReplicaSystem();
+    ars.setConf(conf);
+
+    Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(false).build();
+    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+    // Only consume the first mutation, not the second
+    WalReplication repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+
+    // We stopped because we got to the end of the file
+    Assert.assertEquals(2, repl.entriesConsumed);
+    Assert.assertEquals(1, repl.walEdits.getEditsSize());
+    Assert.assertEquals(1, repl.sizeInRecords);
+    Assert.assertNotEquals(0, repl.sizeInBytes);
+
+    status = Status.newBuilder(status).setBegin(2).build();
+    dis = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
+
+    // Consume the rest of the mutations
+    repl = ars.getWalEdits(new ReplicationTarget("peer", "1", "1"), dis, new Path("/accumulo/wals/tserver+port/wal"), status, 1);
+
+    // We stopped because we got to the end of the file
+    Assert.assertEquals(1, repl.entriesConsumed);
+    Assert.assertEquals(1, repl.walEdits.getEditsSize());
+    Assert.assertEquals(1, repl.sizeInRecords);
+    Assert.assertNotEquals(0, repl.sizeInBytes);
+  }
 }