You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/01/12 20:51:40 UTC
[07/13] accumulo git commit: Merge branch '1.6' into 1.7
Merge branch '1.6' into 1.7
Conflicts:
test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/94f4a19c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/94f4a19c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/94f4a19c
Branch: refs/heads/1.7
Commit: 94f4a19c0d776f3a57e855863c34e9e3164b615b
Parents: 6562828 29c6052
Author: Josh Elser <el...@apache.org>
Authored: Tue Jan 12 12:25:34 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Jan 12 12:25:34 2016 -0500
----------------------------------------------------------------------
test/src/test/java/org/apache/accumulo/test/CleanWalIT.java | 2 +-
.../java/org/apache/accumulo/test/DetectDeadTabletServersIT.java | 2 +-
test/src/test/java/org/apache/accumulo/test/ExistingMacIT.java | 2 +-
.../org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java | 2 +-
.../test/java/org/apache/accumulo/test/MultiTableRecoveryIT.java | 2 +-
.../org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java | 2 +-
.../test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java | 2 +-
.../test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/BinaryStressIT.java | 2 +-
.../test/java/org/apache/accumulo/test/functional/CleanTmpIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/CompactionIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/DurabilityIT.java | 2 +-
.../org/apache/accumulo/test/functional/GarbageCollectorIT.java | 2 +-
.../org/apache/accumulo/test/functional/KerberosRenewalIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/MasterFailoverIT.java | 2 +-
.../test/java/org/apache/accumulo/test/functional/RestartIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/RestartStressIT.java | 2 +-
.../org/apache/accumulo/test/functional/SessionDurabilityIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/WriteAheadLogIT.java | 2 +-
.../org/apache/accumulo/test/functional/ZookeeperRestartIT.java | 2 +-
.../accumulo/test/replication/MultiInstanceReplicationIT.java | 2 +-
.../test/replication/UnorderedWorkAssignerReplicationIT.java | 2 +-
22 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
index d754a14,b2298f7..08e3c09
--- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java
@@@ -57,10 -54,9 +57,10 @@@ public class CleanWalIT extends Accumul
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setNumTservers(1);
- cfg.useMiniDFS(true);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@Before
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
index 04c781b,1e65601..f7ee089
--- a/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
@@@ -42,7 -41,7 +42,7 @@@ public class DetectDeadTabletServersIT
@Override
protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
-- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/ExistingMacIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
index 1a3c92f,0000000..57c2c34
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
@@@ -1,101 -1,0 +1,101 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// Accumulo3010
+public class RecoveryCompactionsAreFlushesIT extends AccumuloClusterIT {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ // file system supports recovery
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test
+ public void test() throws Exception {
+ // create a table
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "3");
+ // create 3 flush files
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", new Value("v".getBytes()));
+ for (int i = 0; i < 3; i++) {
+ bw.addMutation(m);
+ bw.flush();
+ c.tableOperations().flush(tableName, null, null, true);
+ }
+ // create an unsaved mutation
+ bw.addMutation(m);
+ bw.close();
+
+ ClusterControl control = cluster.getClusterControl();
+
+ // kill the tablet servers
+ control.stopAllServers(ServerType.TABLET_SERVER);
+
+ // recover
+ control.startAllServers(ServerType.TABLET_SERVER);
+
+ // ensure the table is readable
+ Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
+
+ // ensure that the recovery was not a merging minor compaction
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : s) {
+ String filename = entry.getKey().getColumnQualifier().toString();
+ String parts[] = filename.split("/");
+ Assert.assertFalse(parts[parts.length - 1].startsWith("M"));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index 081ee85,0000000..68bd07b
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@@ -1,75 -1,0 +1,75 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+// ACCUMULO-2480
+public class TabletServerGivesUpIT extends ConfigurableMacIT {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.useMiniDFS(true);
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15");
+ cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s");
+ }
+
+ @Test(timeout = 30 * 1000)
+ public void test() throws Exception {
+ final Connector conn = this.getConnector();
+ // Yes, there's a tabletserver
+ assertEquals(1, conn.instanceOperations().getTabletServers().size());
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+ // Kill dfs
+ cluster.getMiniDfs().shutdown();
+ // ask the tserver to do something
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ Thread splitter = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TreeSet<Text> splits = new TreeSet<>();
+ splits.add(new Text("X"));
+ conn.tableOperations().addSplits(tableName, splits);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ };
+ splitter.start();
+ // wait for the tserver to give up on writing to the WAL
+ while (conn.instanceOperations().getTabletServers().size() == 1) {
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
index 8338021,0000000..c318075
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
@@@ -1,107 -1,0 +1,107 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class VerifySerialRecoveryIT extends ConfigurableMacIT {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "20");
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testSerializedRecovery() throws Exception {
+ // make a table with many splits
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 200; i++) {
+ splits.add(new Text(AssignmentThreadsIT.randomHex(8)));
+ }
+ c.tableOperations().addSplits(tableName, splits);
+ // load data to give the recovery something to do
+ BatchWriter bw = c.createBatchWriter(tableName, null);
+ for (int i = 0; i < 50000; i++) {
+ Mutation m = new Mutation(AssignmentThreadsIT.randomHex(8));
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ // kill the tserver
+ for (ProcessReference ref : getCluster().getProcesses().get(ServerType.TABLET_SERVER))
+ getCluster().killProcess(ServerType.TABLET_SERVER, ref);
+ final Process ts = cluster.exec(TabletServer.class);
+
+ // wait for recovery
+ Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ ts.waitFor();
+ String result = FunctionalTestUtils.readAll(cluster, TabletServer.class, ts);
+ for (String line : result.split("\n")) {
+ System.out.println(line);
+ }
+ // walk through the output, verifying that only a single normal recovery was running at one time
+ boolean started = false;
+ int recoveries = 0;
+ for (String line : result.split("\n")) {
+ // ignore metadata tables
+ if (line.contains("!0") || line.contains("+r"))
+ continue;
+ if (line.contains("Starting Write-Ahead Log")) {
+ assertFalse(started);
+ started = true;
+ recoveries++;
+ }
+ if (line.contains("Write-Ahead Log recovery complete")) {
+ assertTrue(started);
+ started = false;
+ }
+ }
+ assertFalse(started);
+ assertTrue(recoveries > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
index 62d8738,fbe504e..f1bca1b
--- a/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@@ -50,9 -51,10 +50,9 @@@ public class BinaryStressIT extends Acc
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
- Map<String,String> siteConfig = new HashMap<String,String>();
- siteConfig.put(Property.TSERV_MAXMEM.getKey(), "50K");
- siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "0");
- cfg.setSiteConfig(siteConfig);
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_MAXMEM, "50K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
}
private String majcDelay, maxMem;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
index 921d661,65422c9..d03007e
--- a/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CleanTmpIT.java
@@@ -51,11 -53,12 +51,11 @@@ public class CleanTmpIT extends Configu
private static final Logger log = LoggerFactory.getLogger(CleanTmpIT.class);
@Override
- public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- Map<String,String> props = new HashMap<String,String>();
- props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
- cfg.setSiteConfig(props);
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setNumTservers(1);
- cfg.useMiniDFS(true);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
index 2fe5470,907d17d..818dbc4
--- a/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -58,12 -51,11 +58,12 @@@ public class CompactionIT extends Accum
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
- Map<String,String> map = new HashMap<String,String>();
- map.put(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey(), "4");
- map.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
- map.put(Property.TSERV_MAJC_MAXCONCURRENT.getKey(), "1");
- cfg.setSiteConfig(map);
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_MAJC_THREAD_MAXOPEN, "4");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+ cfg.setProperty(Property.TSERV_MAJC_MAXCONCURRENT, "1");
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
index 9a262a1,0000000..819347e
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/DurabilityIT.java
@@@ -1,222 -1,0 +1,222 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+public class DurabilityIT extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(DurabilityIT.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setNumTservers(1);
+ }
+
+ static final long N = 100000;
+
+ private String[] init() throws Exception {
+ String[] tableNames = getUniqueNames(4);
+ Connector c = getConnector();
+ TableOperations tableOps = c.tableOperations();
+ createTable(tableNames[0]);
+ createTable(tableNames[1]);
+ createTable(tableNames[2]);
+ createTable(tableNames[3]);
+ // default is sync
+ tableOps.setProperty(tableNames[1], Property.TABLE_DURABILITY.getKey(), "flush");
+ tableOps.setProperty(tableNames[2], Property.TABLE_DURABILITY.getKey(), "log");
+ tableOps.setProperty(tableNames[3], Property.TABLE_DURABILITY.getKey(), "none");
+ return tableNames;
+ }
+
+ private void cleanup(String[] tableNames) throws Exception {
+ Connector c = getConnector();
+ for (String tableName : tableNames) {
+ c.tableOperations().delete(tableName);
+ }
+ }
+
+ private void createTable(String tableName) throws Exception {
+ TableOperations tableOps = getConnector().tableOperations();
+ tableOps.create(tableName);
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void testWriteSpeed() throws Exception {
+ TableOperations tableOps = getConnector().tableOperations();
+ String tableNames[] = init();
+ // write some gunk, delete the table to keep that table from messing with the performance numbers of successive calls
+ // sync
+ long t0 = writeSome(tableNames[0], N);
+ tableOps.delete(tableNames[0]);
+ // flush
+ long t1 = writeSome(tableNames[1], N);
+ tableOps.delete(tableNames[1]);
+ // log
+ long t2 = writeSome(tableNames[2], N);
+ tableOps.delete(tableNames[2]);
+ // none
+ long t3 = writeSome(tableNames[3], N);
+ tableOps.delete(tableNames[3]);
+ System.out.println(String.format("sync %d flush %d log %d none %d", t0, t1, t2, t3));
+ assertTrue("flush should be faster than sync", t0 > t1);
+ assertTrue("log should be faster than flush", t1 > t2);
+ assertTrue("no durability should be faster than log", t2 > t3);
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testSync() throws Exception {
+ String tableNames[] = init();
+ // sync table should lose nothing
+ writeSome(tableNames[0], N);
+ restartTServer();
+ assertEquals(N, readSome(tableNames[0]));
+ cleanup(tableNames);
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testFlush() throws Exception {
+ String tableNames[] = init();
+ // flush table won't lose anything since we're not losing power/dfs
+ writeSome(tableNames[1], N);
+ restartTServer();
+ assertEquals(N, readSome(tableNames[1]));
+ cleanup(tableNames);
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testLog() throws Exception {
+ String tableNames[] = init();
+ // we're probably going to lose something the the log setting
+ writeSome(tableNames[2], N);
+ restartTServer();
+ long numResults = readSome(tableNames[2]);
+ assertTrue("Expected " + N + " >= " + numResults, N >= numResults);
+ cleanup(tableNames);
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testNone() throws Exception {
+ String tableNames[] = init();
+ // probably won't get any data back without logging
+ writeSome(tableNames[3], N);
+ restartTServer();
+ long numResults = readSome(tableNames[3]);
+ assertTrue("Expected " + N + " >= " + numResults, N >= numResults);
+ cleanup(tableNames);
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testIncreaseDurability() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+ writeSome(tableName, N);
+ restartTServer();
+ long numResults = readSome(tableName);
+ assertTrue("Expected " + N + " >= " + numResults, N >= numResults);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
+ writeSome(tableName, N);
+ restartTServer();
+ assertTrue(N == readSome(tableName));
+ }
+
+ private static Map<String,String> map(Iterable<Entry<String,String>> entries) {
+ Map<String,String> result = new HashMap<String,String>();
+ for (Entry<String,String> entry : entries) {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ return result;
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testMetaDurability() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.instanceOperations().setProperty(Property.TABLE_DURABILITY.getKey(), "none");
+ Map<String,String> props = map(c.tableOperations().getProperties(MetadataTable.NAME));
+ assertEquals("sync", props.get(Property.TABLE_DURABILITY.getKey()));
+ c.tableOperations().create(tableName);
+ props = map(c.tableOperations().getProperties(tableName));
+ assertEquals("none", props.get(Property.TABLE_DURABILITY.getKey()));
+ restartTServer();
+ assertTrue(c.tableOperations().exists(tableName));
+ }
+
+ private long readSome(String table) throws Exception {
+ return Iterators.size(getConnector().createScanner(table, Authorizations.EMPTY).iterator());
+ }
+
+ private void restartTServer() throws Exception {
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.start();
+ }
+
+ private long writeSome(String table, long count) throws Exception {
+ int iterations = 5;
+ long[] attempts = new long[iterations];
+ for (int attempt = 0; attempt < iterations; attempt++) {
+ long now = System.currentTimeMillis();
+ Connector c = getConnector();
+ BatchWriter bw = c.createBatchWriter(table, null);
+ for (int i = 1; i < count + 1; i++) {
+ Mutation m = new Mutation("" + i);
+ m.put("", "", "");
+ bw.addMutation(m);
+ if (i % (Math.max(1, count / 100)) == 0) {
+ bw.flush();
+ }
+ }
+ bw.close();
+ attempts[attempt] = System.currentTimeMillis() - now;
+ }
+ Arrays.sort(attempts);
+ log.info("Attempt durations: {}", Arrays.toString(attempts));
+ // Return the median duration
+ return attempts[2];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 132cbcc,d5b92cf..6ab0541
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@@ -80,13 -80,14 +80,13 @@@ public class GarbageCollectorIT extend
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
- Map<String,String> settings = new HashMap<String,String>();
- settings.put(Property.INSTANCE_SECRET.getKey(), OUR_SECRET);
- settings.put(Property.GC_CYCLE_START.getKey(), "1");
- settings.put(Property.GC_CYCLE_DELAY.getKey(), "1");
- settings.put(Property.GC_PORT.getKey(), "0");
- settings.put(Property.TSERV_MAXMEM.getKey(), "5K");
- settings.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
- cfg.setSiteConfig(settings);
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
+ cfg.setProperty(Property.GC_CYCLE_START, "1");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+ cfg.setProperty(Property.GC_PORT, "0");
+ cfg.setProperty(Property.TSERV_MAXMEM, "5K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
index 19908f6,0000000..28c1dfc
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/KerberosRenewalIT.java
@@@ -1,188 -1,0 +1,188 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloIT;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.MiniClusterHarness;
+import org.apache.accumulo.harness.TestingKdc;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * MAC test which uses {@link MiniKdc} to simulate ta secure environment. Can be used as a sanity check for Kerberos/SASL testing.
+ */
+public class KerberosRenewalIT extends AccumuloIT {
+ private static final Logger log = LoggerFactory.getLogger(KerberosRenewalIT.class);
+
+ private static TestingKdc kdc;
+ private static String krbEnabledForITs = null;
+ private static ClusterUser rootUser;
+
+ private static final long TICKET_LIFETIME = 6 * 60 * 1000; // Anything less seems to fail when generating the ticket
+ private static final long TICKET_TEST_LIFETIME = 8 * 60 * 1000; // Run a test for 8 mins
+ private static final long TEST_DURATION = 9 * 60 * 1000; // The test should finish within 9 mins
+
+ @BeforeClass
+ public static void startKdc() throws Exception {
+ // 30s renewal time window
+ kdc = new TestingKdc(TestingKdc.computeKdcDir(), TestingKdc.computeKeytabDir(), TICKET_LIFETIME);
+ kdc.start();
+ krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION);
+ if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) {
+ System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true");
+ }
+ rootUser = kdc.getRootUser();
+ }
+
+ @AfterClass
+ public static void stopKdc() throws Exception {
+ if (null != kdc) {
+ kdc.stop();
+ }
+ if (null != krbEnabledForITs) {
+ System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs);
+ }
+ }
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return (int) TEST_DURATION / 1000;
+ }
+
+ private MiniAccumuloClusterImpl mac;
+
+ @Before
+ public void startMac() throws Exception {
+ MiniClusterHarness harness = new MiniClusterHarness();
+ mac = harness.create(this, new PasswordToken("unused"), kdc, new MiniClusterConfigurationCallback() {
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+ Map<String,String> site = cfg.getSiteConfig();
- site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s");
++ site.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
+ // Reduce the period just to make sure we trigger renewal fast
+ site.put(Property.GENERAL_KERBEROS_RENEWAL_PERIOD.getKey(), "5s");
+ cfg.setSiteConfig(site);
+ }
+
+ });
+
+ mac.getConfig().setNumTservers(1);
+ mac.start();
+ // Enabled kerberos auth
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ @After
+ public void stopMac() throws Exception {
+ if (null != mac) {
+ mac.stop();
+ }
+ }
+
+ // Intentially setting the Test annotation timeout. We do not want to scale the timeout.
+ @Test(timeout = TEST_DURATION)
+ public void testReadAndWriteThroughTicketLifetime() throws Exception {
+ // Attempt to use Accumulo for a duration of time that exceeds the Kerberos ticket lifetime.
+ // This is a functional test to verify that Accumulo services renew their ticket.
+ // If the test doesn't finish on its own, this signifies that Accumulo services failed
+ // and the test should fail. If Accumulo services renew their ticket, the test case
+ // should exit gracefully on its own.
+
+ // Login as the "root" user
+ UserGroupInformation.loginUserFromKeytab(rootUser.getPrincipal(), rootUser.getKeytab().getAbsolutePath());
+ log.info("Logged in as {}", rootUser.getPrincipal());
+
+ Connector conn = mac.getConnector(rootUser.getPrincipal(), new KerberosToken());
+ log.info("Created connector as {}", rootUser.getPrincipal());
+ assertEquals(rootUser.getPrincipal(), conn.whoami());
+
+ long duration = 0;
+ long last = System.currentTimeMillis();
+ // Make sure we have a couple renewals happen
+ while (duration < TICKET_TEST_LIFETIME) {
+ // Create a table, write a record, compact, read the record, drop the table.
+ createReadWriteDrop(conn);
+ // Wait a bit after
+ Thread.sleep(5000);
+
+ // Update the duration
+ long now = System.currentTimeMillis();
+ duration += now - last;
+ last = now;
+ }
+ }
+
+ /**
+ * Creates a table, adds a record to it, and then compacts the table. A simple way to make sure that the system user exists (since the master does an RPC to
+ * the tserver which will create the system user if it doesn't already exist).
+ */
+ private void createReadWriteDrop(Connector conn) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, TableExistsException {
+ final String table = testName.getMethodName() + "_table";
+ conn.tableOperations().create(table);
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", "d");
+ bw.addMutation(m);
+ bw.close();
+ conn.tableOperations().compact(table, new CompactionConfig().setFlush(true).setWait(true));
+ Scanner s = conn.createScanner(table, Authorizations.EMPTY);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+ assertEquals("Did not find the expected key", 0, new Key("a", "b", "c").compareTo(entry.getKey(), PartialKey.ROW_COLFAM_COLQUAL));
+ assertEquals("d", entry.getValue().toString());
+ conn.tableOperations().delete(table);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
index dd83574,0c2631f..49160aa
--- a/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/MasterFailoverIT.java
@@@ -37,9 -35,7 +37,9 @@@ public class MasterFailoverIT extends A
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setSiteConfig(Collections.singletonMap(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"));
+ Map<String,String> siteConfig = cfg.getSiteConfig();
- siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s");
++ siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
+ cfg.setSiteConfig(siteConfig);
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
index 2ba6d31,b498412..d1fb9f9
--- a/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RestartIT.java
@@@ -70,10 -66,12 +70,10 @@@ public class RestartIT extends Accumulo
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
- Map<String,String> props = new HashMap<String,String>();
- props.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
- props.put(Property.GC_CYCLE_DELAY.getKey(), "1s");
- props.put(Property.GC_CYCLE_START.getKey(), "1s");
- cfg.setSiteConfig(props);
- cfg.useMiniDFS(true);
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
private static final ScannerOpts SOPTS = new ScannerOpts();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
index 3f7d67d,c4b3afd..68448eb
--- a/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/RestartStressIT.java
@@@ -58,10 -55,10 +58,10 @@@ public class RestartStressIT extends Ac
opts.put(Property.TSERV_MAXMEM.getKey(), "100K");
opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms");
opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M");
- opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "5s");
+ opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
cfg.setSiteConfig(opts);
- cfg.useMiniDFS(true);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
index aec6bae,0000000..ca45382
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/SessionDurabilityIT.java
@@@ -1,153 -1,0 +1,153 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Durability;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class SessionDurabilityIT extends ConfigurableMacIT {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void nondurableTableHasDurableWrites() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default has no durability
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+ // send durable writes
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setDurability(Durability.SYNC);
+ writeSome(tableName, 10, cfg);
+ assertEquals(10, count(tableName));
+ // verify writes servive restart
+ restartTServer();
+ assertEquals(10, count(tableName));
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void durableTableLosesNonDurableWrites() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
+ // write with no durability
+ BatchWriterConfig cfg = new BatchWriterConfig();
+ cfg.setDurability(Durability.NONE);
+ writeSome(tableName, 10, cfg);
+ // verify writes are lost on restart
+ restartTServer();
+ assertTrue(10 > count(tableName));
+ }
+
+ private int count(String tableName) throws Exception {
+ return Iterators.size(getConnector().createScanner(tableName, Authorizations.EMPTY).iterator());
+ }
+
+ private void writeSome(String tableName, int n, BatchWriterConfig cfg) throws Exception {
+ Connector c = getConnector();
+ BatchWriter bw = c.createBatchWriter(tableName, cfg);
+ for (int i = 0; i < n; i++) {
+ Mutation m = new Mutation(i + "");
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void testConditionDurability() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "sync");
+ // write without durability
+ ConditionalWriterConfig cfg = new ConditionalWriterConfig();
+ cfg.setDurability(Durability.NONE);
+ conditionWriteSome(tableName, 10, cfg);
+ // everything in there?
+ assertEquals(10, count(tableName));
+ // restart the server and verify the updates are lost
+ restartTServer();
+ assertEquals(0, count(tableName));
+ }
+
+ @Test(timeout = 3 * 60 * 1000)
+ public void testConditionDurability2() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ // table default is durable writes
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_DURABILITY.getKey(), "none");
+ // write with durability
+ ConditionalWriterConfig cfg = new ConditionalWriterConfig();
+ cfg.setDurability(Durability.SYNC);
+ conditionWriteSome(tableName, 10, cfg);
+ // everything in there?
+ assertEquals(10, count(tableName));
+ // restart the server and verify the updates are still there
+ restartTServer();
+ assertEquals(10, count(tableName));
+ }
+
+ private void conditionWriteSome(String tableName, int n, ConditionalWriterConfig cfg) throws Exception {
+ Connector c = getConnector();
+ ConditionalWriter cw = c.createConditionalWriter(tableName, cfg);
+ for (int i = 0; i < n; i++) {
+ ConditionalMutation m = new ConditionalMutation((CharSequence) (i + ""), new Condition("", ""));
+ m.put("", "", "X");
+ assertEquals(Status.ACCEPTED, cw.write(m).getStatus());
+ }
+ }
+
+ private void restartTServer() throws Exception {
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.start();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
index 44473b0,bfca75b..bc36257
--- a/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WriteAheadLogIT.java
@@@ -35,13 -35,15 +35,13 @@@ public class WriteAheadLogIT extends Ac
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- Map<String,String> siteConfig = new HashMap<String,String>();
- siteConfig.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "2M");
- siteConfig.put(Property.GC_CYCLE_DELAY.getKey(), "1");
- siteConfig.put(Property.GC_CYCLE_START.getKey(), "1");
- siteConfig.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s");
- siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1");
- siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "4s");
- cfg.setSiteConfig(siteConfig);
- cfg.useMiniDFS(true);
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+ cfg.setProperty(Property.GC_CYCLE_START, "1");
+ cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "1s");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "4s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
index f852901,f852901..fefb9a6
--- a/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/ZookeeperRestartIT.java
@@@ -45,7 -45,7 +45,7 @@@ public class ZookeeperRestartIT extend
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
Map<String,String> siteConfig = new HashMap<String,String>();
-- siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "3s");
++ siteConfig.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s");
cfg.setSiteConfig(siteConfig);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94f4a19c/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
index bd10f90,0000000..35bc0fe
mode 100644,000000..100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java
@@@ -1,733 -1,0 +1,733 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Replication tests which start at least two MAC instances and replicate data between them
+ */
+public class MultiInstanceReplicationIT extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class);
+
+ private ExecutorService executor;
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 10 * 60;
+ }
+
+ @Before
+ public void createExecutor() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void stopExecutor() {
+ if (null != executor) {
+ executor.shutdownNow();
+ }
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+ cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+ cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
+ cfg.setProperty(Property.REPLICATION_NAME, "master");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+ cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ /**
+ * Use the same SSL and credential provider configuration that is set up by AbstractMacIT for the other MAC used for replication
+ */
+ private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl primaryCfg, MiniAccumuloConfigImpl peerCfg) {
+ // Set the same SSL information from the primary when present
+ Map<String,String> primarySiteConfig = primaryCfg.getSiteConfig();
+ if ("true".equals(primarySiteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
+ Map<String,String> peerSiteConfig = new HashMap<String,String>();
+ peerSiteConfig.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
+ String keystorePath = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
+ Assert.assertNotNull("Keystore Path was null", keystorePath);
+ peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), keystorePath);
+ String truststorePath = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
+ Assert.assertNotNull("Truststore Path was null", truststorePath);
+ peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), truststorePath);
+
+ // Passwords might be stored in CredentialProvider
+ String keystorePassword = primarySiteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
+ if (null != keystorePassword) {
+ peerSiteConfig.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), keystorePassword);
+ }
+ String truststorePassword = primarySiteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
+ if (null != truststorePassword) {
+ peerSiteConfig.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), truststorePassword);
+ }
+
+ System.out.println("Setting site configuration for peer " + peerSiteConfig);
+ peerCfg.setSiteConfig(peerSiteConfig);
+ }
+
+ // Use the CredentialProvider if the primary also uses one
+ String credProvider = primarySiteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
+ if (null != credProvider) {
+ Map<String,String> peerSiteConfig = peerCfg.getSiteConfig();
+ peerSiteConfig.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), credProvider);
+ peerCfg.setSiteConfig(peerSiteConfig);
+ }
+ }
+
+ @Test
+ public void dataWasReplicatedToThePeer() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
+ MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg);
+
+ peerCluster.start();
+
+ try {
+ final Connector connMaster = getConnector();
+ final Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+
+ ReplicationTable.setOnline(connMaster);
+
+ String peerUserName = "peer", peerPassword = "foo";
+
+ String peerClusterName = "peer";
+
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+
+ final String masterTable = "master", peerTable = "peer";
+
+ connMaster.tableOperations().create(masterTable);
+ String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+ Assert.assertNotNull(masterTableId);
+
+ connPeer.tableOperations().create(peerTable);
+ String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+ Assert.assertNotNull(peerTableId);
+
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+ for (int rows = 0; rows < 5000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+ final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
+
+ log.info("Files to replicate: " + filesNeedingReplication);
+
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("TabletServer restarted");
+ Iterators.size(ReplicationTable.getScanner(connMaster).iterator());
+ log.info("TabletServer is online");
+
+ while (!ReplicationTable.isOnline(connMaster)) {
+ log.info("Replication table still offline, waiting");
+ Thread.sleep(5000);
+ }
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ long then = System.currentTimeMillis();
+ connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
+ long now = System.currentTimeMillis();
+ log.info("Drain completed in " + (now - then) + "ms");
+ return true;
+ }
+
+ });
+
+ try {
+ future.get(60, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ Assert.fail("Drain did not finish within 60 seconds");
+ } finally {
+ executor.shutdownNow();
+ }
+
+ log.info("drain completed");
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+ Entry<Key,Value> masterEntry = null, peerEntry = null;
+ while (masterIter.hasNext() && peerIter.hasNext()) {
+ masterEntry = masterIter.next();
+ peerEntry = peerIter.next();
+ Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+ masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+ Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+ }
+
+ log.info("Last master entry: " + masterEntry);
+ log.info("Last peer entry: " + peerEntry);
+
+ Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+ Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+ } finally {
+ peerCluster.stop();
+ }
+ }
+
+ @Test
+ public void dataReplicatedToCorrectTable() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
+ MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg);
+
+ peer1Cluster.start();
+
+ try {
+ Connector connMaster = getConnector();
+ Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+
+ String peerClusterName = "peer";
+ String peerUserName = "peer", peerPassword = "foo";
+
+ // Create local user
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+
+ String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+
+ // Create tables
+ connMaster.tableOperations().create(masterTable1);
+ String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
+ Assert.assertNotNull(masterTableId1);
+
+ connMaster.tableOperations().create(masterTable2);
+ String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
+ Assert.assertNotNull(masterTableId2);
+
+ connPeer.tableOperations().create(peerTable1);
+ String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
+ Assert.assertNotNull(peerTableId1);
+
+ connPeer.tableOperations().create(peerTable2);
+ String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
+ Assert.assertNotNull(peerTableId2);
+
+ // Grant write permission
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
+
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+ long masterTable1Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable1 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable1Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Write some data to table2
+ bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+ long masterTable2Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable2 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable2Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+ Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
+ masterTable2);
+
+ log.info("Files to replicate for table1: " + filesFor1);
+ log.info("Files to replicate for table2: " + filesFor2);
+
+ // Restart the tserver to force a close on the WAL
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("Restarted the tserver");
+
+ // Read the data -- the tserver is back up and running
+ Iterators.size(connMaster.createScanner(masterTable1, Authorizations.EMPTY).iterator());
+
+ while (!ReplicationTable.isOnline(connMaster)) {
+ log.info("Replication table still offline, waiting");
+ Thread.sleep(5000);
+ }
+
+ // Wait for both tables to be replicated
+ log.info("Waiting for {} for {}", filesFor1, masterTable1);
+ connMaster.replicationOperations().drain(masterTable1, filesFor1);
+
+ log.info("Waiting for {} for {}", filesFor2, masterTable2);
+ connMaster.replicationOperations().drain(masterTable2, filesFor2);
+
+ long countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable1));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable1);
+ Assert.assertEquals(masterTable1Records, countTable);
+
+ countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable2));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable2);
+ Assert.assertEquals(masterTable2Records, countTable);
+
+ } finally {
+ peer1Cluster.stop();
+ }
+ }
+
+ @Test
+ public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
+ MiniAccumuloClusterImpl peerCluster = new MiniAccumuloClusterImpl(peerCfg);
+
+ peerCluster.start();
+
+ Connector connMaster = getConnector();
+ Connector connPeer = peerCluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+
+ String peerUserName = "repl";
+ String peerPassword = "passwd";
+
+ // Create a user on the peer for replication to use
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ String peerClusterName = "peer";
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+
+ // Configure the credentials we should use to authenticate ourselves to the peer for replication
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ String masterTable = "master", peerTable = "peer";
+
+ connMaster.tableOperations().create(masterTable);
+ String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+ Assert.assertNotNull(masterTableId);
+
+ connPeer.tableOperations().create(peerTable);
+ String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+ Assert.assertNotNull(peerTableId);
+
+ // Give our replication user the ability to write to the table
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+ for (int rows = 0; rows < 5000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+ Set<String> files = connMaster.replicationOperations().referencedFiles(masterTable);
+
+ log.info("Files to replicate:" + files);
+
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+
+ cluster.exec(TabletServer.class);
+
+ while (!ReplicationTable.isOnline(connMaster)) {
+ log.info("Replication table still offline, waiting");
+ Thread.sleep(5000);
+ }
+
+ Iterators.size(connMaster.createScanner(masterTable, Authorizations.EMPTY).iterator());
+
+ for (Entry<Key,Value> kv : ReplicationTable.getScanner(connMaster)) {
+ log.debug(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ connMaster.replicationOperations().drain(masterTable, files);
+
+ Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+ while (masterIter.hasNext() && peerIter.hasNext()) {
+ Entry<Key,Value> masterEntry = masterIter.next(), peerEntry = peerIter.next();
+ Assert.assertEquals(peerEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+ masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+ Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+ }
+
+ Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+ Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+
+ peerCluster.stop();
+ }
+
+ @Test
+ public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+
+ updatePeerConfigFromPrimary(getCluster().getConfig(), peerCfg);
+
+ MiniAccumuloClusterImpl peer1Cluster = new MiniAccumuloClusterImpl(peerCfg);
+
+ peer1Cluster.start();
+
+ try {
+ Connector connMaster = getConnector();
+ Connector connPeer = peer1Cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+
+ String peerClusterName = "peer";
+
+ String peerUserName = "repl";
+ String peerPassword = "passwd";
+
+ // Create a user on the peer for replication to use
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ // Configure the credentials we should use to authenticate ourselves to the peer for replication
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+
+ String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+
+ connMaster.tableOperations().create(masterTable1);
+ String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
+ Assert.assertNotNull(masterTableId1);
+
+ connMaster.tableOperations().create(masterTable2);
+ String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
+ Assert.assertNotNull(masterTableId2);
+
+ connPeer.tableOperations().create(peerTable1);
+ String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
+ Assert.assertNotNull(peerTableId1);
+
+ connPeer.tableOperations().create(peerTable2);
+ String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
+ Assert.assertNotNull(peerTableId2);
+
+ // Give our replication user the ability to write to the tables
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId1);
+
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGET.getKey() + peerClusterName, peerTableId2);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable1 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Write some data to table2
+ bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable2 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+
+ cluster.exec(TabletServer.class);
+
+ while (!ReplicationTable.isOnline(connMaster)) {
+ log.info("Replication table still offline, waiting");
+ Thread.sleep(5000);
+ }
+
+ // Wait until we fully replicated something
+ boolean fullyReplicated = false;
+ for (int i = 0; i < 10 && !fullyReplicated; i++) {
+ UtilWaitThread.sleep(2000);
+
+ Scanner s = ReplicationTable.getScanner(connMaster);
+ WorkSection.limit(s);
+ for (Entry<Key,Value> entry : s) {
+ Status status = Status.parseFrom(entry.getValue().get());
+ if (StatusUtil.isFullyReplicated(status)) {
+ fullyReplicated |= true;
+ }
+ }
+ }
+
+ Assert.assertNotEquals(0, fullyReplicated);
+
+ // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it
+ // Be cautious in how quickly we assert that the data is present on the peer
+ long countTable = 0l;
+ for (int i = 0; i < 10; i++) {
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable1));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable1);
+
+ if (0l == countTable) {
+ Thread.sleep(5000);
+ } else {
+ break;
+ }
+ }
+
+ Assert.assertTrue("Found no records in " + peerTable1 + " in the peer cluster", countTable > 0);
+
+ // We have to wait for the master to assign the replication work, a local tserver to process it, and then the remote tserver to replay it
+ // Be cautious in how quickly we assert that the data is present on the peer
+ for (int i = 0; i < 10; i++) {
+ countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable2));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable2);
+
+ if (0l == countTable) {
+ Thread.sleep(5000);
+ } else {
+ break;
+ }
+ }
+
+ Assert.assertTrue("Found no records in " + peerTable2 + " in the peer cluster", countTable > 0);
+
+ } finally {
+ peer1Cluster.stop();
+ }
+ }
+}