You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:27 UTC

[49/50] [abbrv] git commit: ACCUMULO-2583 Implement a basic application of replicated data to the remote table.

ACCUMULO-2583 Implement a basic application of replicated data to the remote table.

Very little consideration given to error handling, but it works as a first step.
Does not record progress on the remote side yet.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e117a78c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e117a78c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e117a78c

Branch: refs/heads/ACCUMULO-378
Commit: e117a78cb9b6d4324bf14dfdaecb04a86dc92cda
Parents: e84879c
Author: Josh Elser <el...@apache.org>
Authored: Thu May 8 22:58:07 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 8 22:58:07 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +-
 .../replication/RemoteReplicationErrorCode.java |  27 +++++
 .../apache/accumulo/tserver/TabletServer.java   |   2 +-
 .../replication/ReplicationServicerHandler.java | 103 ++++++++++++++++++-
 .../test/replication/ReplicationIT.java         |  61 +++++++----
 5 files changed, 170 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a87b1b4..38e8cc3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -461,7 +461,7 @@ public enum Property {
   @Experimental
   REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
   @Experimental
-  REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
+  REPLICATION_WORKER_THREADS("replication.worker.threads", "1", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
   @Experimental
   REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
   @Experimental

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
new file mode 100644
index 0000000..b089307
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+/**
+ * 
+ */
+public enum RemoteReplicationErrorCode {
+  COULD_NOT_DESERIALIZE,
+  COULD_NOT_APPLY,
+  TABLE_DOES_NOT_EXIST,
+  CANNOT_AUTHENTICATE
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 0b5c6bf..f04bf6b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
   }
 
   private HostAndPort startReplicationService() throws UnknownHostException {
-    ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(this));
+    ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance(), getSystemConfiguration()));
     ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
     AccumuloConfiguration conf = getSystemConfiguration();
     Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index 5036cab..336bf87 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -16,10 +16,30 @@
  */
 package org.apache.accumulo.tserver.replication;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
 import org.apache.accumulo.core.replication.thrift.KeyValues;
 import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
 import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface;
 import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,12 +50,93 @@ import org.slf4j.LoggerFactory;
 public class ReplicationServicerHandler implements Iface {
   private static final Logger log = LoggerFactory.getLogger(ReplicationServicerHandler.class);
 
+  private Instance inst;
+  private AccumuloConfiguration conf;
+
+  public ReplicationServicerHandler(Instance inst, AccumuloConfiguration conf) {
+    this.inst = inst;
+    this.conf = conf;
+  }
+  
+
   @Override
   public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, TException {
-    log.error("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize());
+    log.debug("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize());
+
+    String tableId = Integer.toString(remoteTableId);
+    LogFileKey key = new LogFileKey();
+    LogFileValue value = new LogFileValue();
+
+    BatchWriter bw = null;
+    try {
+      for (ByteBuffer edit : data.getEdits()) {
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(edit.array()));
+        try {
+          key.readFields(dis);
+          value.readFields(dis);
+        } catch (IOException e) {
+          log.error("Could not deserialize edit from stream", e);
+          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream");
+        }
+    
+        // Create the batchScanner if we don't already have one.
+        if (null == bw) {
+          bw = getBatchWriter(tableId);
+        }
+
+        try {
+          bw.addMutations(value.mutations);
+        } catch (MutationsRejectedException e) {
+          log.error("Could not apply mutations to {}", remoteTableId);
+          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + remoteTableId);
+        }
+      }
+    } finally {
+      if (null != bw) {
+        try {
+          bw.close();
+        } catch (MutationsRejectedException e) {
+          log.error("Could not apply mutations to {}", remoteTableId);
+          throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + remoteTableId);
+        }
+      }
+    }
+      
+    
     return data.getEditsSize();
   }
 
+  protected BatchWriter getBatchWriter(String tableId) throws RemoteReplicationException {
+    Credentials creds = SystemCredentials.get();
+    Connector conn;
+    String tableName = null;
+    try {
+      conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      log.error("Could not get connector with system credentials. Something is very wrong", e);
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get Connector with system credentials");
+    }
+
+    for (Entry<String,String> nameToId : conn.tableOperations().tableIdMap().entrySet()) {
+      if (tableId.equals(nameToId.getValue())) {
+        tableName = nameToId.getKey();
+        break;
+      }
+    }
+
+    if (null == tableName) {
+      log.error("Table with id of " + tableId + " does not exist");
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id of " + tableId + " does not exist");
+    }
+
+    try {
+      return conn.createBatchWriter(tableName, new BatchWriterConfig());
+    } catch (TableNotFoundException e) {
+      log.error("Table with id of " + tableId + " does not exist");
+      throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id of " + tableId + " does not exist");
+    }
+  }
+
   @Override
   public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, TException {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 0ad8066..3c1ebac 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -26,21 +26,25 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.monitor.Monitor;
 import org.apache.accumulo.server.replication.ReplicationTable;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.TextFormat;
+import com.google.common.collect.Iterables;
 
 /**
  * 
@@ -51,7 +55,7 @@ public class ReplicationIT extends ConfigurableMacIT {
   @Override
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     cfg.setNumTservers(1);
-    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "32M");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
     cfg.setProperty(Property.GC_CYCLE_START, "1s");
     cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
     cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "5s");
@@ -59,18 +63,22 @@ public class ReplicationIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  @Test//(timeout = 60 * 1000)
-  public void test() throws Exception {
+  @Test(timeout = 60 * 5000)
+  public void dataWasReplicatedToThePeer() throws Exception {
     MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
         ROOT_PASSWORD);
     peerCfg.setNumTservers(1);
     peerCfg.setInstanceName("peer");
+    peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
     peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
     peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+    peerCfg.setProperty(Property.REPLICATION_THREADCHECK, "5m");
     MiniAccumuloClusterImpl peerCluster = peerCfg.build();
 
     peerCluster.start();
 
+    Process monitor = peerCluster.exec(Monitor.class);
+
     Connector connMaster = getConnector();
     Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
 
@@ -90,23 +98,23 @@ public class ReplicationIT extends ConfigurableMacIT {
     String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
     Assert.assertNotNull(peerTableId);
 
-    // Replicate this table to the peerClusterName in a table with the peerTableId table id 
+    // Replicate this table to the peerClusterName in a table with the peerTableId table id
     connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
     connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
 
     // Write some data to table1
     BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
-    for (int i = 0; i < 100; i++) {
-      for (int rows = 0; rows < 1000; rows++) {
-        Mutation m = new Mutation(i + "_" + Integer.toString(rows));
-        for (int cols = 0; cols < 400; cols++) {
-          String value = Integer.toString(cols);
-          m.put(value, "", value);
-        }
-        bw.addMutation(m);
+    for (int rows = 0; rows < 5000; rows++) {
+      Mutation m = new Mutation(Integer.toString(rows));
+      for (int cols = 0; cols < 100; cols++) {
+        String value = Integer.toString(cols);
+        m.put(value, "", value);
       }
+      bw.addMutation(m);
     }
 
+    log.info("Wrote all data to master cluster");
+
     bw.close();
 
     while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
@@ -114,19 +122,28 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     connMaster.tableOperations().compact(masterTable, null, null, true, false);
-    for (int i = 0; i < 10; i++) {
+
+    // Wait until we fully replicated something
+    boolean fullyReplicated = false;
+    for (int i = 0; i < 10 && !fullyReplicated; i++) {
+      UtilWaitThread.sleep(2000);
+
       Scanner s = ReplicationTable.getScanner(connMaster);
-      for (Entry<Key,Value> e : s) {
-        Path p = new Path(e.getKey().getRow().toString());
-        log.info(p.getName() + " " + e.getKey().getColumnFamily() + " " + e.getKey().getColumnQualifier() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
+      WorkSection.limit(s);
+      for (Entry<Key,Value> entry : s) {
+        Status status = Status.parseFrom(entry.getValue().get());
+        if (StatusUtil.isFullyReplicated(status)) {
+          fullyReplicated |= true;
+        }
       }
+    }
 
-      log.info("");
-      log.info("");
+    Assert.assertNotEquals(0, fullyReplicated);
 
-      Thread.sleep(3000);
-    }
+    // Once we fully replicated some file, we are guaranteed to have data on the remote
+    Assert.assertTrue(0 < Iterables.size(connPeer.createScanner(peerTable, Authorizations.EMPTY)));
 
+    monitor.destroy();
     peerCluster.stop();
   }