You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:27 UTC
[49/50] [abbrv] git commit: ACCUMULO-2583 Implement a basic
application of replicated data to the remote table.
ACCUMULO-2583 Implement a basic application of replicated data to the remote table.
Very little consideration given to error handling, but it works as a first step.
Does not record progress on the remote side yet.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e117a78c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e117a78c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e117a78c
Branch: refs/heads/ACCUMULO-378
Commit: e117a78cb9b6d4324bf14dfdaecb04a86dc92cda
Parents: e84879c
Author: Josh Elser <el...@apache.org>
Authored: Thu May 8 22:58:07 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 8 22:58:07 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 2 +-
.../replication/RemoteReplicationErrorCode.java | 27 +++++
.../apache/accumulo/tserver/TabletServer.java | 2 +-
.../replication/ReplicationServicerHandler.java | 103 ++++++++++++++++++-
.../test/replication/ReplicationIT.java | 61 +++++++----
5 files changed, 170 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a87b1b4..38e8cc3 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -461,7 +461,7 @@ public enum Property {
@Experimental
REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
@Experimental
- REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
+ REPLICATION_WORKER_THREADS("replication.worker.threads", "1", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
@Experimental
REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
@Experimental
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
new file mode 100644
index 0000000..b089307
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.replication;
+
+/**
+ *
+ */
+public enum RemoteReplicationErrorCode {
+ COULD_NOT_DESERIALIZE,
+ COULD_NOT_APPLY,
+ TABLE_DOES_NOT_EXIST,
+ CANNOT_AUTHENTICATE
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 0b5c6bf..f04bf6b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
private HostAndPort startReplicationService() throws UnknownHostException {
- ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(this));
+ ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(HdfsZooInstance.getInstance(), getSystemConfiguration()));
ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
AccumuloConfiguration conf = getSystemConfiguration();
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index 5036cab..336bf87 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -16,10 +16,30 @@
*/
package org.apache.accumulo.tserver.replication;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.KeyValues;
import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface;
import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,12 +50,93 @@ import org.slf4j.LoggerFactory;
public class ReplicationServicerHandler implements Iface {
private static final Logger log = LoggerFactory.getLogger(ReplicationServicerHandler.class);
+ private Instance inst;
+ private AccumuloConfiguration conf;
+
+ public ReplicationServicerHandler(Instance inst, AccumuloConfiguration conf) {
+ this.inst = inst;
+ this.conf = conf;
+ }
+
+
@Override
public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, TException {
- log.error("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize());
+ log.debug("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize());
+
+ String tableId = Integer.toString(remoteTableId);
+ LogFileKey key = new LogFileKey();
+ LogFileValue value = new LogFileValue();
+
+ BatchWriter bw = null;
+ try {
+ for (ByteBuffer edit : data.getEdits()) {
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(edit.array()));
+ try {
+ key.readFields(dis);
+ value.readFields(dis);
+ } catch (IOException e) {
+ log.error("Could not deserialize edit from stream", e);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream");
+ }
+
+ // Create the batchScanner if we don't already have one.
+ if (null == bw) {
+ bw = getBatchWriter(tableId);
+ }
+
+ try {
+ bw.addMutations(value.mutations);
+ } catch (MutationsRejectedException e) {
+ log.error("Could not apply mutations to {}", remoteTableId);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + remoteTableId);
+ }
+ }
+ } finally {
+ if (null != bw) {
+ try {
+ bw.close();
+ } catch (MutationsRejectedException e) {
+ log.error("Could not apply mutations to {}", remoteTableId);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + remoteTableId);
+ }
+ }
+ }
+
+
return data.getEditsSize();
}
+ protected BatchWriter getBatchWriter(String tableId) throws RemoteReplicationException {
+ Credentials creds = SystemCredentials.get();
+ Connector conn;
+ String tableName = null;
+ try {
+ conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ log.error("Could not get connector with system credentials. Something is very wrong", e);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get Connector with system credentials");
+ }
+
+ for (Entry<String,String> nameToId : conn.tableOperations().tableIdMap().entrySet()) {
+ if (tableId.equals(nameToId.getValue())) {
+ tableName = nameToId.getKey();
+ break;
+ }
+ }
+
+ if (null == tableName) {
+ log.error("Table with id of " + tableId + " does not exist");
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id of " + tableId + " does not exist");
+ }
+
+ try {
+ return conn.createBatchWriter(tableName, new BatchWriterConfig());
+ } catch (TableNotFoundException e) {
+ log.error("Table with id of " + tableId + " does not exist");
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id of " + tableId + " does not exist");
+ }
+ }
+
@Override
public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, TException {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e117a78c/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 0ad8066..3c1ebac 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -26,21 +26,25 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.protobuf.TextFormat;
+import com.google.common.collect.Iterables;
/**
*
@@ -51,7 +55,7 @@ public class ReplicationIT extends ConfigurableMacIT {
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "32M");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
cfg.setProperty(Property.GC_CYCLE_START, "1s");
cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "5s");
@@ -59,18 +63,22 @@ public class ReplicationIT extends ConfigurableMacIT {
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
- @Test//(timeout = 60 * 1000)
- public void test() throws Exception {
+ @Test(timeout = 60 * 5000)
+ public void dataWasReplicatedToThePeer() throws Exception {
MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
+ peerCfg.setProperty(Property.REPLICATION_THREADCHECK, "5m");
MiniAccumuloClusterImpl peerCluster = peerCfg.build();
peerCluster.start();
+ Process monitor = peerCluster.exec(Monitor.class);
+
Connector connMaster = getConnector();
Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
@@ -90,23 +98,23 @@ public class ReplicationIT extends ConfigurableMacIT {
String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
Assert.assertNotNull(peerTableId);
- // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
// Write some data to table1
BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
- for (int i = 0; i < 100; i++) {
- for (int rows = 0; rows < 1000; rows++) {
- Mutation m = new Mutation(i + "_" + Integer.toString(rows));
- for (int cols = 0; cols < 400; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
+ for (int rows = 0; rows < 5000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
}
+ bw.addMutation(m);
}
+ log.info("Wrote all data to master cluster");
+
bw.close();
while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
@@ -114,19 +122,28 @@ public class ReplicationIT extends ConfigurableMacIT {
}
connMaster.tableOperations().compact(masterTable, null, null, true, false);
- for (int i = 0; i < 10; i++) {
+
+ // Wait until we fully replicated something
+ boolean fullyReplicated = false;
+ for (int i = 0; i < 10 && !fullyReplicated; i++) {
+ UtilWaitThread.sleep(2000);
+
Scanner s = ReplicationTable.getScanner(connMaster);
- for (Entry<Key,Value> e : s) {
- Path p = new Path(e.getKey().getRow().toString());
- log.info(p.getName() + " " + e.getKey().getColumnFamily() + " " + e.getKey().getColumnQualifier() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
+ WorkSection.limit(s);
+ for (Entry<Key,Value> entry : s) {
+ Status status = Status.parseFrom(entry.getValue().get());
+ if (StatusUtil.isFullyReplicated(status)) {
+ fullyReplicated |= true;
+ }
}
+ }
- log.info("");
- log.info("");
+ Assert.assertNotEquals(0, fullyReplicated);
- Thread.sleep(3000);
- }
+ // Once we fully replicated some file, we are guaranteed to have data on the remote
+ Assert.assertTrue(0 < Iterables.size(connPeer.createScanner(peerTable, Authorizations.EMPTY)));
+ monitor.destroy();
peerCluster.stop();
}