You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/01/12 20:51:46 UTC
[13/13] accumulo git commit: Merge branch '1.7'
Merge branch '1.7'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/081eb1fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/081eb1fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/081eb1fa
Branch: refs/heads/master
Commit: 081eb1facb7d7402add733b332f5a33152e19e81
Parents: 18725dd 94f4a19
Author: Josh Elser <el...@apache.org>
Authored: Tue Jan 12 12:25:40 2016 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Tue Jan 12 14:51:20 2016 -0500
----------------------------------------------------------------------
test/src/main/java/org/apache/accumulo/test/CleanWalIT.java | 2 +-
.../java/org/apache/accumulo/test/DetectDeadTabletServersIT.java | 2 +-
test/src/main/java/org/apache/accumulo/test/ExistingMacIT.java | 2 +-
.../org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java | 2 +-
test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java | 2 +-
.../main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java | 2 +-
.../apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java | 2 +-
.../java/org/apache/accumulo/test/TabletServerGivesUpIT.java | 2 +-
.../java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java | 2 +-
.../java/org/apache/accumulo/test/VerifySerialRecoveryIT.java | 2 +-
test/src/main/java/org/apache/accumulo/test/VolumeIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/BinaryStressIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/CleanTmpIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/CompactionIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/DurabilityIT.java | 2 +-
.../org/apache/accumulo/test/functional/GarbageCollectorIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/KerberosIT.java | 2 +-
.../org/apache/accumulo/test/functional/KerberosRenewalIT.java | 2 +-
.../org/apache/accumulo/test/functional/MasterFailoverIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/ReadWriteIT.java | 2 +-
.../main/java/org/apache/accumulo/test/functional/RestartIT.java | 2 +-
.../org/apache/accumulo/test/functional/RestartStressIT.java | 2 +-
.../org/apache/accumulo/test/functional/SessionDurabilityIT.java | 2 +-
.../java/org/apache/accumulo/test/functional/WALSunnyDayIT.java | 2 +-
.../org/apache/accumulo/test/functional/WriteAheadLogIT.java | 2 +-
.../org/apache/accumulo/test/functional/ZookeeperRestartIT.java | 2 +-
.../java/org/apache/accumulo/test/proxy/ProxyDurabilityIT.java | 4 ++--
.../replication/GarbageCollectorCommunicatesWithTServersIT.java | 2 +-
.../accumulo/test/replication/MultiInstanceReplicationIT.java | 2 +-
.../java/org/apache/accumulo/test/replication/ReplicationIT.java | 3 +--
.../test/replication/UnorderedWorkAssignerReplicationIT.java | 2 +-
31 files changed, 32 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/CleanWalIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/CleanWalIT.java
index 91b929f,0000000..7146a9f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/CleanWalIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/CleanWalIT.java
@@@ -1,147 -1,0 +1,147 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class CleanWalIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(CleanWalIT.class);
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setNumTservers(1);
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Before
+ public void offlineTraceTable() throws Exception {
+ Connector conn = getConnector();
+ String traceTable = conn.instanceOperations().getSystemConfiguration().get(Property.TRACE_TABLE.getKey());
+ if (conn.tableOperations().exists(traceTable)) {
+ conn.tableOperations().offline(traceTable, true);
+ }
+ }
+
+ @After
+ public void onlineTraceTable() throws Exception {
+ if (null != cluster) {
+ Connector conn = getConnector();
+ String traceTable = conn.instanceOperations().getSystemConfiguration().get(Property.TRACE_TABLE.getKey());
+ if (conn.tableOperations().exists(traceTable)) {
+ conn.tableOperations().online(traceTable, true);
+ }
+ }
+ }
+
+ // test for ACCUMULO-1830
+ @Test
+ public void test() throws Exception {
+ Connector conn = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("row");
+ m.put("cf", "cq", "value");
+ bw.addMutation(m);
+ bw.close();
+ getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ // all 3 tables should do recovery, but the bug doesn't really remove the log file references
+
+ getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+
+ for (String table : new String[] {MetadataTable.NAME, RootTable.NAME})
+ conn.tableOperations().flush(table, null, null, true);
+ log.debug("Checking entries for " + tableName);
+ assertEquals(1, count(tableName, conn));
+ for (String table : new String[] {MetadataTable.NAME, RootTable.NAME}) {
+ log.debug("Checking logs for " + table);
+ assertEquals("Found logs for " + table, 0, countLogs(table, conn));
+ }
+
+ bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ m = new Mutation("row");
+ m.putDelete("cf", "cq");
+ bw.addMutation(m);
+ bw.close();
+ assertEquals(0, count(tableName, conn));
+ conn.tableOperations().flush(tableName, null, null, true);
+ conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ conn.tableOperations().flush(RootTable.NAME, null, null, true);
+ try {
+ getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ sleepUninterruptibly(3, TimeUnit.SECONDS);
+ } finally {
+ getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ assertEquals(0, count(tableName, conn));
+ }
+
+ private int countLogs(String tableName, Connector conn) throws TableNotFoundException {
+ Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ int count = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ log.debug("Saw " + entry.getKey() + "=" + entry.getValue());
+ count++;
+ }
+ return count;
+ }
+
+ int count(String tableName, Connector conn) throws Exception {
+ Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+ return Iterators.size(s.iterator());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
index e4e0962,0000000..f207353
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/DetectDeadTabletServersIT.java
@@@ -1,97 -1,0 +1,97 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class DetectDeadTabletServersIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ }
+
+ @Test
+ public void test() throws Exception {
+ Connector c = getConnector();
+ log.info("verifying that everything is up");
+ Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+
+ MasterMonitorInfo stats = getStats(c);
+ assertEquals(2, stats.tServerInfo.size());
+ assertEquals(0, stats.badTServers.size());
+ assertEquals(0, stats.deadTabletServers.size());
+ log.info("Killing a tablet server");
+ getCluster().killProcess(TABLET_SERVER, getCluster().getProcesses().get(TABLET_SERVER).iterator().next());
+
+ while (true) {
+ stats = getStats(c);
+ if (2 != stats.tServerInfo.size()) {
+ break;
+ }
+ UtilWaitThread.sleep(500);
+ }
+ assertEquals(1, stats.tServerInfo.size());
+ assertEquals(1, stats.badTServers.size() + stats.deadTabletServers.size());
+ while (true) {
+ stats = getStats(c);
+ if (0 != stats.deadTabletServers.size()) {
+ break;
+ }
+ UtilWaitThread.sleep(500);
+ }
+ assertEquals(1, stats.tServerInfo.size());
+ assertEquals(0, stats.badTServers.size());
+ assertEquals(1, stats.deadTabletServers.size());
+ }
+
+ private MasterMonitorInfo getStats(Connector c) throws Exception {
+ Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
+ ClientContext context = new ClientContext(c.getInstance(), creds, getClientConfig());
+ Client client = null;
+ try {
+ client = MasterClient.getConnectionWithRetry(context);
+ log.info("Fetching master stats");
+ return client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+ } finally {
+ if (client != null) {
+ MasterClient.close(client);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/ExistingMacIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/ExistingMacIT.java
index d4f4d58,0000000..7984393
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/ExistingMacIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExistingMacIT.java
@@@ -1,171 -1,0 +1,171 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class ExistingMacIT extends ConfigurableMacBase {
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ private void createEmptyConfig(File confFile) throws IOException {
+ Configuration conf = new Configuration(false);
+ OutputStream hcOut = new FileOutputStream(confFile);
+ conf.writeXml(hcOut);
+ hcOut.close();
+ }
+
+ @Test
+ public void testExistingInstance() throws Exception {
+
+ Connector conn = getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD));
+
+ conn.tableOperations().create("table1");
+
+ BatchWriter bw = conn.createBatchWriter("table1", new BatchWriterConfig());
+
+ Mutation m1 = new Mutation("00081");
+ m1.put("math", "sqroot", "9");
+ m1.put("math", "sq", "6560");
+
+ bw.addMutation(m1);
+ bw.close();
+
+ conn.tableOperations().flush("table1", null, null, true);
+ // TOOD use constants
+ conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ conn.tableOperations().flush(RootTable.NAME, null, null, true);
+
+ Set<Entry<ServerType,Collection<ProcessReference>>> procs = getCluster().getProcesses().entrySet();
+ for (Entry<ServerType,Collection<ProcessReference>> entry : procs) {
+ if (entry.getKey() == ServerType.ZOOKEEPER)
+ continue;
+ for (ProcessReference pr : entry.getValue())
+ getCluster().killProcess(entry.getKey(), pr);
+ }
+
+ // TODO clean out zookeeper? following sleep waits for ephemeral nodes to go away
+ sleepUninterruptibly(10, TimeUnit.SECONDS);
+
+ File hadoopConfDir = createTestDir(ExistingMacIT.class.getSimpleName() + "_hadoop_conf");
+ FileUtils.deleteQuietly(hadoopConfDir);
+ assertTrue(hadoopConfDir.mkdirs());
+ createEmptyConfig(new File(hadoopConfDir, "core-site.xml"));
+ createEmptyConfig(new File(hadoopConfDir, "hdfs-site.xml"));
+
+ File testDir2 = createTestDir(ExistingMacIT.class.getSimpleName() + "_2");
+ FileUtils.deleteQuietly(testDir2);
+
+ MiniAccumuloConfigImpl macConfig2 = new MiniAccumuloConfigImpl(testDir2, "notused");
+ macConfig2.useExistingInstance(new File(getCluster().getConfig().getConfDir(), "accumulo-site.xml"), hadoopConfDir);
+
+ MiniAccumuloClusterImpl accumulo2 = new MiniAccumuloClusterImpl(macConfig2);
+ accumulo2.start();
+
+ conn = accumulo2.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+
+ Scanner scanner = conn.createScanner("table1", Authorizations.EMPTY);
+
+ int sum = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ sum += Integer.parseInt(entry.getValue().toString());
+ }
+
+ Assert.assertEquals(6569, sum);
+
+ accumulo2.stop();
+ }
+
+ @Test
+ public void testExistingRunningInstance() throws Exception {
+ final String table = getUniqueNames(1)[0];
+ Connector conn = getConnector();
+ // Ensure that a master and tserver are up so the existing instance check won't fail.
+ conn.tableOperations().create(table);
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("foo");
+ m.put("cf", "cq", "value");
+ bw.addMutation(m);
+ bw.close();
+
+ File hadoopConfDir = createTestDir(ExistingMacIT.class.getSimpleName() + "_hadoop_conf_2");
+ FileUtils.deleteQuietly(hadoopConfDir);
+ assertTrue(hadoopConfDir.mkdirs());
+ createEmptyConfig(new File(hadoopConfDir, "core-site.xml"));
+ createEmptyConfig(new File(hadoopConfDir, "hdfs-site.xml"));
+
+ File testDir2 = createTestDir(ExistingMacIT.class.getSimpleName() + "_3");
+ FileUtils.deleteQuietly(testDir2);
+
+ MiniAccumuloConfigImpl macConfig2 = new MiniAccumuloConfigImpl(testDir2, "notused");
+ macConfig2.useExistingInstance(new File(getCluster().getConfig().getConfDir(), "accumulo-site.xml"), hadoopConfDir);
+
+ System.out.println("conf " + new File(getCluster().getConfig().getConfDir(), "accumulo-site.xml"));
+
+ MiniAccumuloClusterImpl accumulo2 = new MiniAccumuloClusterImpl(macConfig2);
+ try {
+ accumulo2.start();
+ Assert.fail("A 2nd MAC instance should not be able to start over an existing MAC instance");
+ } catch (RuntimeException e) {
+ // TODO check message or throw more explicit exception
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
index a4f067e,0000000..e54e451
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@@ -1,162 -1,0 +1,162 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.master.state.ClosableIterator;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 5 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "5s");
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test
+ public void test() throws Exception {
+ // make some tablets, spread 'em around
+ Connector c = getConnector();
+ ClientContext context = new ClientContext(c.getInstance(), new Credentials("root", new PasswordToken(ROOT_PASSWORD)), getClientConfig());
+ String table = this.getUniqueNames(1)[0];
+ c.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+ c.securityOperations().grantTablePermission("root", RootTable.NAME, TablePermission.WRITE);
+ c.tableOperations().create(table);
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ for (String part : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) {
+ partitions.add(new Text(part));
+ }
+ c.tableOperations().addSplits(table, partitions);
+ // scan the metadata table and get the two table location states
+ Set<TServerInstance> states = new HashSet<TServerInstance>();
+ Set<TabletLocationState> oldLocations = new HashSet<TabletLocationState>();
+ MetaDataStateStore store = new MetaDataStateStore(context, null);
+ while (states.size() < 2) {
+ UtilWaitThread.sleep(250);
+ oldLocations.clear();
+ for (TabletLocationState tls : store) {
+ if (tls.current != null) {
+ states.add(tls.current);
+ oldLocations.add(tls);
+ }
+ }
+ }
+ assertEquals(2, states.size());
+ // Kill a tablet server... we don't care which one... wait for everything to be reassigned
+ cluster.killProcess(ServerType.TABLET_SERVER, cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+ Set<TServerInstance> replStates = new HashSet<>();
+ // Find out which tablet server remains
+ while (true) {
+ UtilWaitThread.sleep(1000);
+ states.clear();
+ replStates.clear();
+ boolean allAssigned = true;
+ for (TabletLocationState tls : store) {
+ if (tls != null && tls.current != null) {
+ states.add(tls.current);
+ } else if (tls != null && tls.extent.equals(new KeyExtent(new Text(ReplicationTable.ID), null, null))) {
+ replStates.add(tls.current);
+ } else {
+ allAssigned = false;
+ }
+ }
+ System.out.println(states + " size " + states.size() + " allAssigned " + allAssigned);
+ if (states.size() != 2 && allAssigned == true)
+ break;
+ }
+ assertEquals(1, replStates.size());
+ assertEquals(1, states.size());
+ // pick an assigned tablet and assign it to the old tablet
+ TabletLocationState moved = null;
+ for (TabletLocationState old : oldLocations) {
+ if (!states.contains(old.current)) {
+ moved = old;
+ }
+ }
+ assertNotEquals(null, moved);
+ // throw a mutation in as if we were the dying tablet
+ BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation assignment = new Mutation(moved.extent.getMetadataEntry());
+ moved.current.putLocation(assignment);
+ bw.addMutation(assignment);
+ bw.close();
+ // wait for the master to fix the problem
+ waitForCleanStore(store);
+ // now jam up the metadata table
+ bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ assignment = new Mutation(new KeyExtent(new Text(MetadataTable.ID), null, null).getMetadataEntry());
+ moved.current.putLocation(assignment);
+ bw.addMutation(assignment);
+ bw.close();
+ waitForCleanStore(new RootTabletStateStore(context, null));
+ }
+
+ private void waitForCleanStore(MetaDataStateStore store) {
+ while (true) {
+ try (ClosableIterator<TabletLocationState> iter = store.iterator()) {
+ Iterators.size(iter);
+ } catch (Exception ex) {
+ System.out.println(ex);
+ UtilWaitThread.sleep(250);
+ continue;
+ }
+ break;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java
index 9f93381,0000000..0c16a5f
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MetaRecoveryIT.java
@@@ -1,96 -1,0 +1,96 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// ACCUMULO-3211
+public class MetaRecoveryIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.TSERV_ARCHIVE_WALOGS, "true");
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1048576");
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void test() throws Exception {
+ String[] tables = getUniqueNames(10);
+ Connector c = getConnector();
+ int i = 0;
+ for (String table : tables) {
+ log.info("Creating table {}", i);
+ c.tableOperations().create(table);
+ BatchWriter bw = c.createBatchWriter(table, null);
+ for (int j = 0; j < 1000; j++) {
+ Mutation m = new Mutation("" + j);
+ m.put("cf", "cq", "value");
+ bw.addMutation(m);
+ }
+ bw.close();
+ log.info("Data written to table {}", i);
+ i++;
+ }
+ c.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ c.tableOperations().flush(RootTable.NAME, null, null, true);
+ SortedSet<Text> splits = new TreeSet<>();
+ for (i = 1; i < tables.length; i++) {
+ splits.add(new Text("" + i));
+ }
+ c.tableOperations().addSplits(MetadataTable.NAME, splits);
+ log.info("Added {} splits to {}", splits.size(), MetadataTable.NAME);
+ c.instanceOperations().waitForBalance();
+ log.info("Restarting");
+ getCluster().getClusterControl().kill(ServerType.TABLET_SERVER, "localhost");
+ getCluster().start();
+ log.info("Verifying");
+ for (String table : tables) {
+ BatchScanner scanner = c.createBatchScanner(table, Authorizations.EMPTY, 5);
+ scanner.setRanges(Collections.singletonList(new Range()));
+ assertEquals(1000, Iterators.size(scanner.iterator()));
+ scanner.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
index d584613,0000000..e62b5ad
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultiTableRecoveryIT.java
@@@ -1,135 -1,0 +1,135 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class MultiTableRecoveryIT extends ConfigurableMacBase {
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testRecoveryOverMultipleTables() throws Exception {
+ final int N = 3;
+ final Connector c = getConnector();
+ final String[] tables = getUniqueNames(N);
+ final BatchWriter[] writers = new BatchWriter[N];
+ final byte[][] values = new byte[N][];
+ int i = 0;
+ System.out.println("Creating tables");
+ for (String tableName : tables) {
+ c.tableOperations().create(tableName);
+ values[i] = Integer.toString(i).getBytes();
+ writers[i] = c.createBatchWriter(tableName, null);
+ i++;
+ }
+ System.out.println("Creating agitator");
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ final Thread agitator = agitator(stop);
+ agitator.start();
+ System.out.println("writing");
+ final Random random = new Random();
+ for (i = 0; i < 1_000_000; i++) {
+ // make non-negative avoiding Math.abs, because that can still be negative
+ long randomRow = random.nextLong() & Long.MAX_VALUE;
+ assertTrue(randomRow >= 0);
+ final int table = (int) (randomRow % N);
+ final Mutation m = new Mutation(Long.toHexString(randomRow));
+ m.put(new byte[0], new byte[0], values[table]);
+ writers[table].addMutation(m);
+ if (i % 10_000 == 0) {
+ System.out.println("flushing");
+ for (int w = 0; w < N; w++) {
+ writers[w].flush();
+ }
+ }
+ }
+ System.out.println("closing");
+ for (int w = 0; w < N; w++) {
+ writers[w].close();
+ }
+ System.out.println("stopping the agitator");
+ stop.set(true);
+ agitator.join();
+ System.out.println("checking the data");
+ long count = 0;
+ for (int w = 0; w < N; w++) {
+ Scanner scanner = c.createScanner(tables[w], Authorizations.EMPTY);
+ for (Entry<Key,Value> entry : scanner) {
+ int value = Integer.parseInt(entry.getValue().toString());
+ assertEquals(w, value);
+ count++;
+ }
+ scanner.close();
+ }
+ assertEquals(1_000_000, count);
+ }
+
+ private Thread agitator(final AtomicBoolean stop) {
+ return new Thread() {
+ @Override
+ public void run() {
+ try {
+ int i = 0;
+ while (!stop.get()) {
+ sleepUninterruptibly(10, TimeUnit.SECONDS);
+ System.out.println("Restarting");
+ getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+ getCluster().start();
+ // read the metadata table to know everything is back up
+ Iterators.size(getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+ i++;
+ }
+ System.out.println("Restarted " + i + " times");
+ } catch (Exception ex) {
+ log.error("{}", ex.getMessage(), ex);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
index 60b3cf7,0000000..f79e174
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/RecoveryCompactionsAreFlushesIT.java
@@@ -1,101 -1,0 +1,101 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.cluster.ClusterControl;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// Accumulo3010
+public class RecoveryCompactionsAreFlushesIT extends AccumuloClusterHarness {
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ // file system supports recovery
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test
+ public void test() throws Exception {
+ // create a table
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "100");
+ c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "3");
+ // create 3 flush files
+ BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation("a");
+ m.put("b", "c", new Value("v".getBytes()));
+ for (int i = 0; i < 3; i++) {
+ bw.addMutation(m);
+ bw.flush();
+ c.tableOperations().flush(tableName, null, null, true);
+ }
+ // create an unsaved mutation
+ bw.addMutation(m);
+ bw.close();
+
+ ClusterControl control = cluster.getClusterControl();
+
+ // kill the tablet servers
+ control.stopAllServers(ServerType.TABLET_SERVER);
+
+ // recover
+ control.startAllServers(ServerType.TABLET_SERVER);
+
+ // ensure the table is readable
+ Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
+
+ // ensure that the recovery was not a merging minor compaction
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ for (Entry<Key,Value> entry : s) {
+ String filename = entry.getKey().getColumnQualifier().toString();
+ String parts[] = filename.split("/");
+ Assert.assertFalse(parts[parts.length - 1].startsWith("M"));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
index f7b11f6,0000000..33c1798
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java
@@@ -1,77 -1,0 +1,77 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+// ACCUMULO-2480
+public class TabletServerGivesUpIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.useMiniDFS(true);
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES, "15");
+ cfg.setProperty(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION, "0s");
+ }
+
+ @Test(timeout = 30 * 1000)
+ public void test() throws Exception {
+ final Connector conn = this.getConnector();
+ // Yes, there's a tabletserver
+ assertEquals(1, conn.instanceOperations().getTabletServers().size());
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+ // Kill dfs
+ cluster.getMiniDfs().shutdown();
+ // ask the tserver to do something
+ final AtomicReference<Exception> ex = new AtomicReference<>();
+ Thread splitter = new Thread() {
+ @Override
+ public void run() {
+ try {
+ TreeSet<Text> splits = new TreeSet<>();
+ splits.add(new Text("X"));
+ conn.tableOperations().addSplits(tableName, splits);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ };
+ splitter.start();
+ // wait for the tserver to give up on writing to the WAL
+ while (conn.instanceOperations().getTabletServers().size() == 1) {
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
index e92c1c5,0000000..1e063f0
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java
@@@ -1,67 -1,0 +1,67 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// ACCUMULO-3914
+public class TabletServerHdfsRestartIT extends ConfigurableMacBase {
+
+ private static final int N = 1000;
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.useMiniDFS(true);
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ }
+
+ @Test(timeout = 2 * 60 * 1000)
+ public void test() throws Exception {
+ final Connector conn = this.getConnector();
+ // Yes, there's a tabletserver
+ assertEquals(1, conn.instanceOperations().getTabletServers().size());
+ final String tableName = getUniqueNames(1)[0];
+ conn.tableOperations().create(tableName);
+ BatchWriter bw = conn.createBatchWriter(tableName, null);
+ for (int i = 0; i < N; i++) {
+ Mutation m = new Mutation("" + i);
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ conn.tableOperations().flush(tableName, null, null, true);
+
+ // Kill dfs
+ cluster.getMiniDfs().restartNameNode(false);
+
+ assertEquals(N, Iterators.size(conn.createScanner(tableName, Authorizations.EMPTY).iterator()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
index 6a90730,0000000..1204ee0
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VerifySerialRecoveryIT.java
@@@ -1,107 -1,0 +1,107 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class VerifySerialRecoveryIT extends ConfigurableMacBase {
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(1);
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "20");
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test(timeout = 4 * 60 * 1000)
+ public void testSerializedRecovery() throws Exception {
+ // make a table with many splits
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ c.tableOperations().create(tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 200; i++) {
+ splits.add(new Text(AssignmentThreadsIT.randomHex(8)));
+ }
+ c.tableOperations().addSplits(tableName, splits);
+ // load data to give the recovery something to do
+ BatchWriter bw = c.createBatchWriter(tableName, null);
+ for (int i = 0; i < 50000; i++) {
+ Mutation m = new Mutation(AssignmentThreadsIT.randomHex(8));
+ m.put("", "", "");
+ bw.addMutation(m);
+ }
+ bw.close();
+ // kill the tserver
+ for (ProcessReference ref : getCluster().getProcesses().get(ServerType.TABLET_SERVER))
+ getCluster().killProcess(ServerType.TABLET_SERVER, ref);
+ final Process ts = cluster.exec(TabletServer.class);
+
+ // wait for recovery
+ Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator());
+ assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ ts.waitFor();
+ String result = FunctionalTestUtils.readAll(cluster, TabletServer.class, ts);
+ for (String line : result.split("\n")) {
+ System.out.println(line);
+ }
+ // walk through the output, verifying that only a single normal recovery was running at one time
+ boolean started = false;
+ int recoveries = 0;
+ for (String line : result.split("\n")) {
+ // ignore metadata tables
+ if (line.contains("!0") || line.contains("+r"))
+ continue;
+ if (line.contains("Starting Write-Ahead Log")) {
+ assertFalse(started);
+ started = true;
+ recoveries++;
+ }
+ if (line.contains("Write-Ahead Log recovery complete")) {
+ assertTrue(started);
+ started = false;
+ }
+ }
+ assertFalse(started);
+ assertTrue(recoveries > 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/VolumeIT.java
index b325359,0000000..0a06fdf
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@@ -1,568 -1,0 +1,568 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
+import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class VolumeIT extends ConfigurableMacBase {
+
+ private static final Text EMPTY = new Text();
+ private static final Value EMPTY_VALUE = new Value(new byte[] {});
+ private File volDirBase;
+ private Path v1, v2;
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 5 * 60;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ File baseDir = cfg.getDir();
+ volDirBase = new File(baseDir, "volumes");
+ File v1f = new File(volDirBase, "v1");
+ File v2f = new File(volDirBase, "v2");
+ v1 = new Path("file://" + v1f.getAbsolutePath());
+ v2 = new Path("file://" + v2f.getAbsolutePath());
+
+ // Run MAC on two locations in the local file system
+ URI v1Uri = v1.toUri();
+ cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
+ cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
+ cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+
+ super.configure(cfg, hadoopCoreSite);
+ }
+
+ @Test
+ public void test() throws Exception {
+ // create a table
+ Connector connector = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName);
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ // with some splits
+ for (String s : "d,m,t".split(","))
+ partitions.add(new Text(s));
+ connector.tableOperations().addSplits(tableName, partitions);
+ // scribble over the splits
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+ String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+ for (String s : rows) {
+ Mutation m = new Mutation(new Text(s));
+ m.put(EMPTY, EMPTY, EMPTY_VALUE);
+ bw.addMutation(m);
+ }
+ bw.close();
+ // write the data to disk, read it back
+ connector.tableOperations().flush(tableName, null, null, true);
+ Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
+ int i = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ assertEquals(rows[i++], entry.getKey().getRow().toString());
+ }
+ // verify the new files are written to the different volumes
+ scanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(new Range("1", "1<"));
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ int fileCount = 0;
+
+ for (Entry<Key,Value> entry : scanner) {
+ boolean inV1 = entry.getKey().getColumnQualifier().toString().contains(v1.toString());
+ boolean inV2 = entry.getKey().getColumnQualifier().toString().contains(v2.toString());
+ assertTrue(inV1 || inV2);
+ fileCount++;
+ }
+ assertEquals(4, fileCount);
+ List<DiskUsage> diskUsage = connector.tableOperations().getDiskUsage(Collections.singleton(tableName));
+ assertEquals(1, diskUsage.size());
+ long usage = diskUsage.get(0).getUsage().longValue();
+ log.debug("usage {}", usage);
+ assertTrue(usage > 700 && usage < 800);
+ }
+
+ private void verifyData(List<String> expected, Scanner createScanner) {
+
+ List<String> actual = new ArrayList<String>();
+
+ for (Entry<Key,Value> entry : createScanner) {
+ Key k = entry.getKey();
+ actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" + entry.getValue());
+ }
+
+ Collections.sort(expected);
+ Collections.sort(actual);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testRelativePaths() throws Exception {
+
+ List<String> expected = new ArrayList<String>();
+
+ Connector connector = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ connector.tableOperations().create(tableName, new NewTableConfiguration().withoutDefaultIterators());
+
+ String tableId = connector.tableOperations().tableIdMap().get(tableName);
+
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ // with some splits
+ for (String s : "c,g,k,p,s,v".split(","))
+ partitions.add(new Text(s));
+
+ connector.tableOperations().addSplits(tableName, partitions);
+
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+
+ // create two files in each tablet
+
+ String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+ for (String s : rows) {
+ Mutation m = new Mutation(s);
+ m.put("cf1", "cq1", "1");
+ bw.addMutation(m);
+ expected.add(s + ":cf1:cq1:1");
+ }
+
+ bw.flush();
+ connector.tableOperations().flush(tableName, null, null, true);
+
+ for (String s : rows) {
+ Mutation m = new Mutation(s);
+ m.put("cf1", "cq1", "2");
+ bw.addMutation(m);
+ expected.add(s + ":cf1:cq1:2");
+ }
+
+ bw.close();
+ connector.tableOperations().flush(tableName, null, null, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ connector.tableOperations().offline(tableName, true);
+
+ connector.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+ Scanner metaScanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ BatchWriter mbw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ if (cq.startsWith(v1.toString())) {
+ Path path = new Path(cq);
+ String relPath = "/" + path.getParent().getName() + "/" + path.getName();
+ Mutation fileMut = new Mutation(entry.getKey().getRow());
+ fileMut.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ fileMut.put(entry.getKey().getColumnFamily().toString(), relPath, entry.getValue().toString());
+ mbw.addMutation(fileMut);
+ }
+ }
+
+ mbw.close();
+
+ connector.tableOperations().online(tableName, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ connector.tableOperations().compact(tableName, null, null, true, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ Path path = new Path(cq);
+ Assert.assertTrue("relative path not deleted " + path.toString(), path.depth() > 2);
+ }
+
+ }
+
+ @Test
+ public void testAddVolumes() throws Exception {
+
+ String[] tableNames = getUniqueNames(2);
+
+ // grab this before shutting down cluster
+ String uuid = new ZooKeeperInstance(cluster.getClientConfig()).getInstanceID();
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ File v3f = new File(volDirBase, "v3");
+ assertTrue(v3f.mkdir() || v3f.isDirectory());
+ Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v1.toString() + "," + v2.toString() + "," + v3.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // initialize volume
+ Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+ // check that all volumes are initialized
+ for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+ FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+ Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+ FileStatus[] iids = fs.listStatus(vp);
+ Assert.assertEquals(1, iids.length);
+ Assert.assertEquals(uuid, iids[0].getPath().getName());
+ }
+
+ // start cluster and verify that new volume is used
+ cluster.start();
+
+ verifyVolumesUsed(tableNames[1], false, v1, v2, v3);
+ }
+
+ @Test
+ public void testNonConfiguredVolumes() throws Exception {
+
+ String[] tableNames = getUniqueNames(2);
+
+ // grab this before shutting down cluster
+ String uuid = new ZooKeeperInstance(cluster.getClientConfig()).getInstanceID();
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ File v3f = new File(volDirBase, "v3");
+ assertTrue(v3f.mkdir() || v3f.isDirectory());
+ Path v3 = new Path("file://" + v3f.getAbsolutePath());
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString() + "," + v3.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // initialize volume
+ Assert.assertEquals(0, cluster.exec(Initialize.class, "--add-volumes").waitFor());
+
+ // check that all volumes are initialized
+ for (Path volumePath : Arrays.asList(v1, v2, v3)) {
+ FileSystem fs = volumePath.getFileSystem(CachedConfiguration.getInstance());
+ Path vp = new Path(volumePath, ServerConstants.INSTANCE_ID_DIR);
+ FileStatus[] iids = fs.listStatus(vp);
+ Assert.assertEquals(1, iids.length);
+ Assert.assertEquals(uuid, iids[0].getPath().getName());
+ }
+
+ // start cluster and verify that new volume is used
+ cluster.start();
+
+ // Make sure we can still read the tables (tableNames[0] is very likely to have a file still on v1)
+ List<String> expected = new ArrayList<String>();
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ expected.add(row + ":cf1:cq1:1");
+ }
+
+ verifyData(expected, getConnector().createScanner(tableNames[0], Authorizations.EMPTY));
+
+ // v1 should not have any data for tableNames[1]
+ verifyVolumesUsed(tableNames[1], false, v2, v3);
+ }
+
+ private void writeData(String tableName, Connector conn) throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException,
+ MutationsRejectedException {
+ TreeSet<Text> splits = new TreeSet<Text>();
+ for (int i = 1; i < 100; i++) {
+ splits.add(new Text(String.format("%06d", i * 100)));
+ }
+
+ conn.tableOperations().create(tableName);
+ conn.tableOperations().addSplits(tableName, splits);
+
+ BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ Mutation m = new Mutation(row);
+ m.put("cf1", "cq1", "1");
+ bw.addMutation(m);
+ }
+
+ bw.close();
+ }
+
+ private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws Exception {
+
+ Connector conn = getConnector();
+
+ List<String> expected = new ArrayList<String>();
+ for (int i = 0; i < 100; i++) {
+ String row = String.format("%06d", i * 100 + 3);
+ expected.add(row + ":cf1:cq1:1");
+ }
+
+ if (!conn.tableOperations().exists(tableName)) {
+ Assert.assertFalse(shouldExist);
+
+ writeData(tableName, conn);
+
+ verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+ conn.tableOperations().flush(tableName, null, null, true);
+ }
+
+ verifyData(expected, conn.createScanner(tableName, Authorizations.EMPTY));
+
+ String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ Scanner metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(metaScanner);
+ metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ int counts[] = new int[paths.length];
+
+ outer: for (Entry<Key,Value> entry : metaScanner) {
+ String cf = entry.getKey().getColumnFamily().toString();
+ String cq = entry.getKey().getColumnQualifier().toString();
+
+ String path;
+ if (cf.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME.toString()))
+ path = cq;
+ else
+ path = entry.getValue().toString();
+
+ for (int i = 0; i < paths.length; i++) {
+ if (path.startsWith(paths[i].toString())) {
+ counts[i]++;
+ continue outer;
+ }
+ }
+
+ Assert.fail("Unexpected volume " + path);
+ }
+
+ Instance i = conn.getInstance();
+ ZooReaderWriter zk = new ZooReaderWriter(i.getZooKeepers(), i.getZooKeepersSessionTimeOut(), "");
+ WalStateManager wals = new WalStateManager(i, zk);
+ outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+ for (Path path : paths) {
+ if (entry.getKey().toString().startsWith(path.toString())) {
+ continue outer;
+ }
+ }
+ Assert.fail("Unexpected volume " + entry.getKey());
+ }
+
+ // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -
+ // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18
+
+ int sum = 0;
+ for (int count : counts) {
+ Assert.assertTrue(count > 0);
+ sum += count;
+ }
+
+ Assert.assertEquals(200, sum);
+
+ }
+
+ @Test
+ public void testRemoveVolumes() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+ cluster.stop();
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v2.toString());
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // start cluster and verify that volume was decommisioned
+ cluster.start();
+
+ Connector conn = cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD));
+ conn.tableOperations().compact(tableNames[0], null, null, true, true);
+
+ verifyVolumesUsed(tableNames[0], true, v2);
+
+ // check that root tablet is not on volume 1
+ ZooReader zreader = new ZooReader(cluster.getZooKeepers(), 30000);
+ String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getClientConfig())) + RootTable.ZROOT_TABLET_PATH;
+ String rootTabletDir = new String(zreader.getData(zpath, false, null), UTF_8);
+ Assert.assertTrue(rootTabletDir.startsWith(v2.toString()));
+
+ conn.tableOperations().clone(tableNames[0], tableNames[1], true, new HashMap<String,String>(), new HashSet<String>());
+
+ conn.tableOperations().flush(MetadataTable.NAME, null, null, true);
+ conn.tableOperations().flush(RootTable.NAME, null, null, true);
+
+ verifyVolumesUsed(tableNames[0], true, v2);
+ verifyVolumesUsed(tableNames[1], true, v2);
+
+ }
+
+ private void testReplaceVolume(boolean cleanShutdown) throws Exception {
+ String[] tableNames = getUniqueNames(3);
+
+ verifyVolumesUsed(tableNames[0], false, v1, v2);
+
+ // write to 2nd table, but do not flush data to disk before shutdown
+ writeData(tableNames[1], cluster.getConnector("root", new PasswordToken(ROOT_PASSWORD)));
+
+ if (cleanShutdown)
+ Assert.assertEquals(0, cluster.exec(Admin.class, "stopAll").waitFor());
+
+ cluster.stop();
+
+ File v1f = new File(v1.toUri());
+ File v8f = new File(new File(v1.getParent().toUri()), "v8");
+ Assert.assertTrue("Failed to rename " + v1f + " to " + v8f, v1f.renameTo(v8f));
+ Path v8 = new Path(v8f.toURI());
+
+ File v2f = new File(v2.toUri());
+ File v9f = new File(new File(v2.getParent().toUri()), "v9");
+ Assert.assertTrue("Failed to rename " + v2f + " to " + v9f, v2f.renameTo(v9f));
+ Path v9 = new Path(v9f.toURI());
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(new Path(cluster.getConfig().getConfDir().toURI().toString(), "accumulo-site.xml"));
+
+ conf.set(Property.INSTANCE_VOLUMES.getKey(), v8 + "," + v9);
+ conf.set(Property.INSTANCE_VOLUMES_REPLACEMENTS.getKey(), v1 + " " + v8 + "," + v2 + " " + v9);
+ BufferedOutputStream fos = new BufferedOutputStream(new FileOutputStream(new File(cluster.getConfig().getConfDir(), "accumulo-site.xml")));
+ conf.writeXml(fos);
+ fos.close();
+
+ // start cluster and verify that volumes were replaced
+ cluster.start();
+
+ verifyVolumesUsed(tableNames[0], true, v8, v9);
+ verifyVolumesUsed(tableNames[1], true, v8, v9);
+
+ // verify writes to new dir
+ getConnector().tableOperations().compact(tableNames[0], null, null, true, true);
+ getConnector().tableOperations().compact(tableNames[1], null, null, true, true);
+
+ verifyVolumesUsed(tableNames[0], true, v8, v9);
+ verifyVolumesUsed(tableNames[1], true, v8, v9);
+
+ // check that root tablet is not on volume 1 or 2
+ ZooReader zreader = new ZooReader(cluster.getZooKeepers(), 30000);
+ String zpath = ZooUtil.getRoot(new ZooKeeperInstance(cluster.getClientConfig())) + RootTable.ZROOT_TABLET_PATH;
+ String rootTabletDir = new String(zreader.getData(zpath, false, null), UTF_8);
+ Assert.assertTrue(rootTabletDir.startsWith(v8.toString()) || rootTabletDir.startsWith(v9.toString()));
+
+ getConnector().tableOperations().clone(tableNames[1], tableNames[2], true, new HashMap<String,String>(), new HashSet<String>());
+
+ getConnector().tableOperations().flush(MetadataTable.NAME, null, null, true);
+ getConnector().tableOperations().flush(RootTable.NAME, null, null, true);
+
+ verifyVolumesUsed(tableNames[0], true, v8, v9);
+ verifyVolumesUsed(tableNames[1], true, v8, v9);
+ verifyVolumesUsed(tableNames[2], true, v8, v9);
+ }
+
+ @Test
+ public void testCleanReplaceVolumes() throws Exception {
+ testReplaceVolume(true);
+ }
+
+ @Test
+ public void testDirtyReplaceVolumes() throws Exception {
+ testReplaceVolume(false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/081eb1fa/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
index 440d2cf,0000000..9ce221a
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BinaryStressIT.java
@@@ -1,107 -1,0 +1,107 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BinaryStressIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 4 * 60;
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "3s");
++ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_MAXMEM, "50K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "0");
+ }
+
+ private String majcDelay, maxMem;
+
+ @Before
+ public void alterConfig() throws Exception {
+ if (ClusterType.MINI == getClusterType()) {
+ return;
+ }
+
+ InstanceOperations iops = getConnector().instanceOperations();
+ Map<String,String> conf = iops.getSystemConfiguration();
+ majcDelay = conf.get(Property.TSERV_MAJC_DELAY.getKey());
+ maxMem = conf.get(Property.TSERV_MAXMEM.getKey());
+
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "0");
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), "50K");
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+
+ @After
+ public void resetConfig() throws Exception {
+ if (null != majcDelay) {
+ InstanceOperations iops = getConnector().instanceOperations();
+ iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), majcDelay);
+ iops.setProperty(Property.TSERV_MAXMEM.getKey(), maxMem);
+
+ getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+ }
+ }
+
+ @Test
+ public void binaryStressTest() throws Exception {
+ Connector c = getConnector();
+ String tableName = getUniqueNames(1)[0];
+ c.tableOperations().create(tableName);
+ c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K");
+ BinaryIT.runTest(c, tableName);
+ String id = c.tableOperations().tableIdMap().get(tableName);
+ Set<Text> tablets = new HashSet<>();
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(id));
+ for (Entry<Key,Value> entry : s) {
+ tablets.add(entry.getKey().getRow());
+ }
+ assertTrue("Expected at least 8 tablets, saw " + tablets.size(), tablets.size() > 7);
+ }
+
+}