You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/20 03:25:21 UTC

[2/6] git commit: ACCUMULO-2925 Create regular Mutations from ServerMutations when applying replication data on a peer

ACCUMULO-2925 Create regular Mutations from ServerMutations when applying replication data on a peer

Mutations do not store unserialized ColumnUpdates, but only generate them
on demand via the getter. This is intended to create an efficient implementation
(both performance and size) while preseving immutability.

Server-assigned timestamps work around this immutability by wrapping normal
Mutations in a ServerMutation and ColumnUpdates with ServerColumnUpdates. By doing
this, ServerMutations can "fake" the timestamp on ColumnUpdates that otherwise
do not have a timestamp set.

In the context of replication, this is still a problem as all Mutations that are
sent to a peer are ServerMutations (as we read them from a WAL). These Mutations are
deserialized and passed into a BatchWriter to apply to the local instance; however, the
BatchWriter is ignorant of ServerMutations and the special timestamp handling.

When the BatchWriter makes a "copy" of the Mutation (see ACCUMULO-2915), despite this
being a shallow copy, the server-assigned timestamp is lost by creating a regular
Mutation from what was a ServerMutation. Even if this were possible, the TMutation
class, which the BatchWriter eventually uses to send to the Mutations to a TabletServer,
is also ignorant of the ServerMutation timestamp without modification of the serialization
and TMutation class.

As such, the only option left is to, when encountering ServerMutations in the BatchWriterReplicationReplayer
code, we *must* recreate new Mutations, applying the possibly present server-timestamp to
each new Mutation we create to ensure that the timestamp is correctly propagated to this peer.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b062a0bd
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b062a0bd
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b062a0bd

Branch: refs/heads/master
Commit: b062a0bd3ed388f89bc04dfa2903bf3cc951976c
Parents: 4d6683a
Author: Josh Elser <el...@apache.org>
Authored: Thu Jun 19 14:02:42 2014 -0700
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jun 19 14:10:01 2014 -0700

----------------------------------------------------------------------
 .../AccumuloReplicationReplayer.java            |   3 +-
 .../replication/AccumuloReplicaSystem.java      |   6 +-
 .../BatchWriterReplicationReplayer.java         |  52 +++++++-
 .../replication/ReplicationServicerHandler.java |   4 +-
 .../BatchWriterReplicationReplayerTest.java     | 119 +++++++++++++++++++
 5 files changed, 174 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b062a0bd/core/src/main/java/org/apache/accumulo/core/replication/AccumuloReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/AccumuloReplicationReplayer.java b/core/src/main/java/org/apache/accumulo/core/replication/AccumuloReplicationReplayer.java
index 916d33b..620381d 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/AccumuloReplicationReplayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/AccumuloReplicationReplayer.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.core.replication;
 
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
@@ -26,7 +27,7 @@ import org.apache.accumulo.core.replication.thrift.WalEdits;
  */
 public interface AccumuloReplicationReplayer {
 
-  public long replicateLog(Connector conn, String tableName, WalEdits data) throws RemoteReplicationException;
+  public long replicateLog(Connector conn, AccumuloConfiguration conf, String tableName, WalEdits data) throws RemoteReplicationException;
   public long replicateKeyValues(Connector conn, String tableName, KeyValues kvs) throws RemoteReplicationException;
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b062a0bd/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 977020b..7fd2471 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -57,8 +56,6 @@ import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
-import org.apache.accumulo.server.data.ServerColumnUpdate;
-import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.trace.instrument.Span;
@@ -584,6 +581,9 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
    * {@link ReplicationTarget}
    */
   protected long writeValueAvoidingReplicationCycles(DataOutputStream out, LogFileValue value, ReplicationTarget target) throws IOException {
+    // TODO This works like LogFileValue, and needs to be parsable by it, which makes this serialization brittle.
+    // see matching TODO in BatchWriterReplicationReplayer
+
     int mutationsToSend = 0;
     for (Mutation m : value.mutations) {
       if (!m.getReplicationSources().contains(target.getPeerName())) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b062a0bd/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index 8b1a402..1d2a529 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -20,6 +20,8 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -28,13 +30,15 @@ import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
-import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.server.data.ServerMutation;
 import org.apache.accumulo.tserver.logger.LogFileKey;
 import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.slf4j.Logger;
@@ -48,8 +52,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
   private static final Logger log = LoggerFactory.getLogger(BatchWriterReplicationReplayer.class);
 
   @Override
-  public long replicateLog(Connector conn, String tableName, WalEdits data) throws RemoteReplicationException {
-    final AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+  public long replicateLog(Connector conn, AccumuloConfiguration conf, String tableName, WalEdits data) throws RemoteReplicationException {
     final LogFileKey key = new LogFileKey();
     final LogFileValue value = new LogFileValue();
     final long memoryInBytes = conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY);
@@ -61,6 +64,8 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
         DataInputStream dis = new DataInputStream(new ByteArrayInputStream(edit.array()));
         try {
           key.readFields(dis);
+          // TODO this is brittle because AccumuloReplicaSystem isn't actually calling LogFileValue.write, but we're expecting
+          // what we receive to be readable by the LogFileValue.
           value.readFields(dis);
         } catch (IOException e) {
           log.error("Could not deserialize edit from stream", e);
@@ -80,8 +85,45 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
 
         log.info("Applying {} mutations to table {} as part of batch", value.mutations.size(), tableName);
 
+        // If we got a ServerMutation, we have to make sure that we preserve the systemTimestamp otherwise
+        // the local system will assign a new timestamp.
+        List<Mutation> mutationsCopy = new ArrayList<>(value.mutations.size());
+        long mutationsCopied = 0l;
+        for (Mutation orig : value.mutations) {
+          if (orig instanceof ServerMutation) {
+            mutationsCopied++;
+
+            ServerMutation origServer = (ServerMutation) orig;
+            Mutation copy = new Mutation(orig.getRow());
+            for (ColumnUpdate update : orig.getUpdates()) {
+              long timestamp;
+
+              // If the update doesn't have a timestamp, pull it from the ServerMutation
+              if (!update.hasTimestamp()) {
+                timestamp = origServer.getSystemTimestamp();
+              } else {
+                timestamp = update.getTimestamp();
+              }
+
+              // TODO cache the CVs
+              if (update.isDeleted()) {
+                copy.putDelete(update.getColumnFamily(), update.getColumnQualifier(), new ColumnVisibility(update.getColumnVisibility()), timestamp);
+              } else {
+                copy.put(update.getColumnFamily(), update.getColumnQualifier(), new ColumnVisibility(update.getColumnVisibility()), timestamp,
+                    update.getValue());
+              }
+            }
+
+            mutationsCopy.add(copy);
+          } else {
+            mutationsCopy.add(orig);
+          }
+        }
+
+        log.debug("Copied {} mutations to ensure server-assigned timestamps are propagated", mutationsCopied);
+
         try {
-          bw.addMutations(value.mutations);
+          bw.addMutations(mutationsCopy);
         } catch (MutationsRejectedException e) {
           log.error("Could not apply mutations to {}", tableName);
           throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b062a0bd/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index 06b61c6..b3f1556 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -35,6 +35,8 @@ import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
 import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,7 +111,7 @@ public class ReplicationServicerHandler implements Iface {
           + clz.getName());
     }
 
-    long entriesReplicated = replayer.replicateLog(conn, tableName, data);
+    long entriesReplicated = replayer.replicateLog(conn, ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance()), tableName, data);
 
     return entriesReplicated;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b062a0bd/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
new file mode 100644
index 0000000..1e70490
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * 
+ */
+public class BatchWriterReplicationReplayerTest {
+
+  @Test
+  public void systemTimestampsAreSetOnUpdates() throws Exception {
+    final BatchWriterReplicationReplayer replayer = new BatchWriterReplicationReplayer();
+    final String tableName = "foo";
+    final long systemTimestamp = 1000;
+    final BatchWriterConfig bwCfg = new BatchWriterConfig();
+    bwCfg.setMaxMemory(1l);
+
+    Connector conn = createMock(Connector.class);
+    AccumuloConfiguration conf = createMock(AccumuloConfiguration.class);
+    BatchWriter bw = createMock(BatchWriter.class);
+
+    LogFileKey key = new LogFileKey();
+    key.event = LogEvents.MANY_MUTATIONS;
+    key.seq = 1;
+    key.tid = 1;
+
+    WalEdits edits = new WalEdits();
+
+    // Make a mutation without timestamps
+    Mutation m = new Mutation("row");
+    m.put("cf", "cq1", "value");
+    m.put("cf", "cq2", "value");
+    m.put("cf", "cq3", "value");
+    m.put("cf", "cq4", "value");
+    m.put("cf", "cq5", "value");
+
+    // Make it a TMutation
+    TMutation tMutation = m.toThrift();
+
+    // And then make a ServerMutation from the TMutation, adding in our systemTimestamp
+    ServerMutation sMutation = new ServerMutation(tMutation);
+    sMutation.setSystemTimestamp(systemTimestamp);
+
+    // Serialize the ServerMutation (what AccumuloReplicaSystem will be doing)
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(baos);
+
+    key.write(out);
+    out.writeInt(1);
+    sMutation.write(out);
+
+    out.close();
+
+    // Add it to our "input" to BatchWriterReplicationReplayer
+    edits.addToEdits(ByteBuffer.wrap(baos.toByteArray()));
+
+    Mutation expectedMutation = new Mutation("row");
+    expectedMutation.put("cf", "cq1", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq2", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq3", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq4", sMutation.getSystemTimestamp(), "value");
+    expectedMutation.put("cf", "cq5", sMutation.getSystemTimestamp(), "value");
+
+    expect(conf.getMemoryInBytes(Property.TSERV_REPLICATION_BW_REPLAYER_MEMORY)).andReturn(bwCfg.getMaxMemory());
+    expect(conn.createBatchWriter(tableName, bwCfg)).andReturn(bw);
+
+    bw.addMutations(Lists.newArrayList(expectedMutation));
+    expectLastCall().once();
+
+    bw.close();
+    expectLastCall().once();
+
+    replay(conn, conf, bw);
+    
+    replayer.replicateLog(conn, conf, tableName, edits);
+
+    verify(conn, conf, bw);
+  }
+
+}