You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2017/01/03 20:55:54 UTC
[5/7] accumulo-testing git commit: ACCUMULO-4510 Adding Randomwalk
code from Accumulo
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java
new file mode 100644
index 0000000..87a48f9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Merge.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+public class Merge extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Connector conn = env.getConnector();
+
+ Random rand = (Random) state.get("rand");
+
+ @SuppressWarnings("unchecked")
+ List<String> tableNames = (List<String>) state.get("tables");
+ tableNames = new ArrayList<>(tableNames);
+ tableNames.add(MetadataTable.NAME);
+ String tableName = tableNames.get(rand.nextInt(tableNames.size()));
+
+ List<Text> range = ConcurrentFixture.generateRange(rand);
+
+ try {
+ conn.tableOperations().merge(tableName, range.get(0), range.get(1));
+ log.debug("merged " + tableName + " from " + range.get(0) + " to " + range.get(1));
+ } catch (TableOfflineException toe) {
+ log.debug("merge " + tableName + " from " + range.get(0) + " to " + range.get(1) + " failed, table is not online");
+ } catch (TableNotFoundException tne) {
+ log.debug("merge " + tableName + " from " + range.get(0) + " to " + range.get(1) + " failed, doesnt exist");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java
new file mode 100644
index 0000000..fd01d98
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/OfflineTable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class OfflineTable extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Connector conn = env.getConnector();
+
+ Random rand = (Random) state.get("rand");
+
+ @SuppressWarnings("unchecked")
+ List<String> tableNames = (List<String>) state.get("tables");
+
+ String tableName = tableNames.get(rand.nextInt(tableNames.size()));
+
+ try {
+ conn.tableOperations().offline(tableName, rand.nextBoolean());
+ log.debug("Offlined " + tableName);
+ sleepUninterruptibly(rand.nextInt(200), TimeUnit.MILLISECONDS);
+ conn.tableOperations().online(tableName, rand.nextBoolean());
+ log.debug("Onlined " + tableName);
+ } catch (TableNotFoundException tne) {
+ log.debug("offline or online failed " + tableName + ", doesnt exist");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java
new file mode 100644
index 0000000..dab41bf
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameNamespace.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.NamespaceExistsException;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class RenameNamespace extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Connector conn = env.getConnector();
+
+ Random rand = (Random) state.get("rand");
+
+ @SuppressWarnings("unchecked")
+ List<String> namespaces = (List<String>) state.get("namespaces");
+
+ String srcName = namespaces.get(rand.nextInt(namespaces.size()));
+ String newName = namespaces.get(rand.nextInt(namespaces.size()));
+
+ try {
+ conn.namespaceOperations().rename(srcName, newName);
+ log.debug("Renamed namespace " + srcName + " " + newName);
+ } catch (NamespaceExistsException e) {
+ log.debug("Rename namespace " + srcName + " failed, " + newName + " exists");
+ } catch (NamespaceNotFoundException e) {
+ log.debug("Rename namespace " + srcName + " failed, doesn't exist");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java
new file mode 100644
index 0000000..4c5a52f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/RenameTable.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class RenameTable extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Connector conn = env.getConnector();
+
+ Random rand = (Random) state.get("rand");
+
+ @SuppressWarnings("unchecked")
+ List<String> tableNames = (List<String>) state.get("tables");
+
+ String srcTableName = tableNames.get(rand.nextInt(tableNames.size()));
+ String newTableName = tableNames.get(rand.nextInt(tableNames.size()));
+
+ String srcNamespace = "", newNamespace = "";
+
+ int index = srcTableName.indexOf('.');
+ if (-1 != index) {
+ srcNamespace = srcTableName.substring(0, index);
+ }
+
+ index = newTableName.indexOf('.');
+ if (-1 != index) {
+ newNamespace = newTableName.substring(0, index);
+ }
+
+ try {
+ conn.tableOperations().rename(srcTableName, newTableName);
+ log.debug("Renamed table " + srcTableName + " " + newTableName);
+ } catch (TableExistsException e) {
+ log.debug("Rename " + srcTableName + " failed, " + newTableName + " exists");
+ } catch (TableNotFoundException e) {
+ Throwable cause = e.getCause();
+ if (null != cause) {
+ // Rename has to have failed on the destination namespace, because the source namespace
+ // couldn't be deleted with our table in it
+ if (cause.getClass().isAssignableFrom(NamespaceNotFoundException.class)) {
+ log.debug("Rename failed because new namespace doesn't exist: " + newNamespace, cause);
+ // Avoid the final src/dest namespace check
+ return;
+ }
+ }
+
+ log.debug("Rename " + srcTableName + " failed, doesnt exist");
+ } catch (IllegalArgumentException e) {
+ log.debug("Rename: " + e.toString());
+ } catch (AccumuloException e) {
+ // Catch the expected failure when we try to rename a table into a new namespace
+ if (!srcNamespace.equals(newNamespace)) {
+ return;
+ }
+ log.debug("Rename " + srcTableName + " failed.", e);
+ }
+
+ if (!srcNamespace.equals(newNamespace)) {
+ log.error("RenameTable operation should have failed when renaming across namespaces.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java
new file mode 100644
index 0000000..189d743
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Replication.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import static org.apache.accumulo.core.conf.Property.MASTER_REPLICATION_SCAN_INTERVAL;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_NAME;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_PEERS;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_PEER_PASSWORD;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_PEER_USER;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_ASSIGNMENT_SLEEP;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_PROCESSOR_DELAY;
+import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_PROCESSOR_PERIOD;
+import static org.apache.accumulo.core.conf.Property.TABLE_REPLICATION;
+import static org.apache.accumulo.core.conf.Property.TABLE_REPLICATION_TARGET;
+import static org.apache.accumulo.server.replication.ReplicaSystemFactory.getPeerConfigurationValue;
+
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.hadoop.io.Text;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class Replication extends Test {
+
+ final int ROWS = 1000;
+ final int COLS = 50;
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ final Connector c = env.getConnector();
+ final Instance inst = c.getInstance();
+ final String instName = inst.getInstanceName();
+ final InstanceOperations iOps = c.instanceOperations();
+ final TableOperations tOps = c.tableOperations();
+
+ // Replicate to ourselves
+ iOps.setProperty(REPLICATION_NAME.getKey(), instName);
+ iOps.setProperty(REPLICATION_PEERS.getKey() + instName, getPeerConfigurationValue(AccumuloReplicaSystem.class, instName + "," + inst.getZooKeepers()));
+ iOps.setProperty(REPLICATION_PEER_USER.getKey() + instName, env.getUserName());
+ iOps.setProperty(REPLICATION_PEER_PASSWORD.getKey() + instName, env.getPassword());
+ // Tweak some replication parameters to make the replication go faster
+ iOps.setProperty(MASTER_REPLICATION_SCAN_INTERVAL.getKey(), "1s");
+ iOps.setProperty(REPLICATION_WORK_ASSIGNMENT_SLEEP.getKey(), "1s");
+ iOps.setProperty(REPLICATION_WORK_PROCESSOR_DELAY.getKey(), "1s");
+ iOps.setProperty(REPLICATION_WORK_PROCESSOR_PERIOD.getKey(), "1s");
+
+ // Ensure the replication table is online
+ ReplicationTable.setOnline(c);
+ boolean online = ReplicationTable.isOnline(c);
+ for (int i = 0; i < 10; i++) {
+ if (online)
+ break;
+ sleepUninterruptibly(2, TimeUnit.SECONDS);
+ online = ReplicationTable.isOnline(c);
+ }
+ assertTrue("Replication table was not online", online);
+
+ // Make a source and destination table
+ final String sourceTable = ("repl-source-" + UUID.randomUUID()).replace('-', '_');
+ final String destTable = ("repl-dest-" + UUID.randomUUID()).replace('-', '_');
+ final String tables[] = new String[] {sourceTable, destTable};
+
+ for (String tableName : tables) {
+ log.debug("creating " + tableName);
+ tOps.create(tableName);
+ }
+
+ // Point the source to the destination
+ final String destID = tOps.tableIdMap().get(destTable);
+ tOps.setProperty(sourceTable, TABLE_REPLICATION.getKey(), "true");
+ tOps.setProperty(sourceTable, TABLE_REPLICATION_TARGET.getKey() + instName, destID);
+
+ // zookeeper propagation wait
+ sleepUninterruptibly(5, TimeUnit.SECONDS);
+
+ // Maybe split the tables
+ Random rand = new Random(System.currentTimeMillis());
+ for (String tableName : tables) {
+ if (rand.nextBoolean()) {
+ splitTable(tOps, tableName);
+ }
+ }
+
+ // write some checkable data
+ BatchWriter bw = c.createBatchWriter(sourceTable, null);
+ for (int row = 0; row < ROWS; row++) {
+ Mutation m = new Mutation(itos(row));
+ for (int col = 0; col < COLS; col++) {
+ m.put("", itos(col), "");
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ // attempt to force the WAL to roll so replication begins
+ final Set<String> origRefs = c.replicationOperations().referencedFiles(sourceTable);
+ // write some data we will ignore
+ while (true) {
+ final Set<String> updatedFileRefs = c.replicationOperations().referencedFiles(sourceTable);
+ updatedFileRefs.retainAll(origRefs);
+ log.debug("updateFileRefs size " + updatedFileRefs.size());
+ if (updatedFileRefs.isEmpty()) {
+ break;
+ }
+ bw = c.createBatchWriter(sourceTable, null);
+ for (int row = 0; row < ROWS; row++) {
+ Mutation m = new Mutation(itos(row));
+ for (int col = 0; col < COLS; col++) {
+ m.put("ignored", itos(col), "");
+ }
+ bw.addMutation(m);
+ }
+ bw.close();
+ }
+
+ // wait a little while for replication to take place
+ sleepUninterruptibly(30, TimeUnit.SECONDS);
+
+ // check the data
+ Scanner scanner = c.createScanner(destTable, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(new Text(""));
+ int row = 0;
+ int col = 0;
+ for (Entry<Key,Value> entry : scanner) {
+ assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString()));
+ assertEquals(col, Integer.parseInt(entry.getKey().getColumnQualifier().toString()));
+ col++;
+ if (col == COLS) {
+ row++;
+ col = 0;
+ }
+ }
+ assertEquals(ROWS, row);
+ assertEquals(0, col);
+
+ // cleanup
+ for (String tableName : tables) {
+ log.debug("Deleting " + tableName);
+ tOps.delete(tableName);
+ }
+ }
+
+ // junit isn't a dependency
+ private void assertEquals(int expected, int actual) {
+ if (expected != actual)
+ throw new RuntimeException(String.format("%d fails to match expected value %d", actual, expected));
+ }
+
+ // junit isn't a dependency
+ private void assertTrue(String string, boolean test) {
+ if (!test)
+ throw new RuntimeException(string);
+ }
+
+ private static String itos(int i) {
+ return String.format("%08d", i);
+ }
+
+ private void splitTable(TableOperations tOps, String tableName) throws Exception {
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 1; i <= 9; i++) {
+ splits.add(new Text(itos(i * (ROWS / 10))));
+ }
+ log.debug("Adding splits to " + tableName);
+ tOps.addSplits(tableName, splits);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java
new file mode 100644
index 0000000..ab89bea
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/ScanTable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class ScanTable extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Connector conn = env.getConnector();
+
+ Random rand = (Random) state.get("rand");
+
+ @SuppressWarnings("unchecked")
+ List<String> tableNames = (List<String>) state.get("tables");
+
+ String tableName = tableNames.get(rand.nextInt(tableNames.size()));
+
+ try {
+ Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ }
+ log.debug("Scanned " + tableName);
+ } catch (TableDeletedException e) {
+ log.debug("Scan " + tableName + " failed, table deleted");
+ } catch (TableNotFoundException e) {
+ log.debug("Scan " + tableName + " failed, doesnt exist");
+ } catch (TableOfflineException e) {
+ log.debug("Scan " + tableName + " failed, offline");
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof AccumuloSecurityException) {
+ log.debug("BatchScan " + tableName + " failed, permission error");
+ } else {
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java
new file mode 100644
index 0000000..164fd4f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Setup.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class Setup extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Random rand = new Random();
+ state.set("rand", rand);
+
+ int numTables = Integer.parseInt(props.getProperty("numTables", "9"));
+ int numNamespaces = Integer.parseInt(props.getProperty("numNamespaces", "2"));
+ log.debug("numTables = " + numTables);
+ log.debug("numNamespaces = " + numNamespaces);
+ List<String> tables = new ArrayList<>();
+ List<String> namespaces = new ArrayList<>();
+
+ for (int i = 0; i < numNamespaces; i++) {
+ namespaces.add(String.format("nspc_%03d", i));
+ }
+
+ // Make tables in the default namespace
+ double tableCeil = Math.ceil((double) numTables / (numNamespaces + 1));
+ for (int i = 0; i < tableCeil; i++) {
+ tables.add(String.format("ctt_%03d", i));
+ }
+
+ // Make tables in each namespace
+ double tableFloor = Math.floor(numTables / (numNamespaces + 1));
+ for (String n : namespaces) {
+ for (int i = 0; i < tableFloor; i++) {
+ tables.add(String.format(n + ".ctt_%03d", i));
+ }
+ }
+
+ state.set("tables", tables);
+ state.set("namespaces", namespaces);
+
+ int numUsers = Integer.parseInt(props.getProperty("numUsers", "5"));
+ log.debug("numUsers = " + numUsers);
+ List<String> users = new ArrayList<>();
+ for (int i = 0; i < numUsers; i++)
+ users.add(String.format("user%03d", i));
+ state.set("users", users);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java
new file mode 100644
index 0000000..dc2e670
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/Shutdown.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class Shutdown extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ log.info("shutting down");
+ SetGoalState.main(new String[] {MasterGoalState.CLEAN_STOP.name()});
+
+ while (!env.getConnector().instanceOperations().getTabletServers().isEmpty()) {
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ while (true) {
+ try {
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
+ Client client = MasterClient.getConnection(context);
+ client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+ } catch (Exception e) {
+ // assume this is due to server shutdown
+ break;
+ }
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ log.info("servers stopped");
+ sleepUninterruptibly(10, TimeUnit.SECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java
new file mode 100644
index 0000000..df30487
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StartAll.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.master.thrift.MasterClientService.Client;
+import org.apache.accumulo.core.master.thrift.MasterGoalState;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.master.state.SetGoalState;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class StartAll extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ log.info("Starting all servers");
+ SetGoalState.main(new String[] {MasterGoalState.NORMAL.name()});
+ Process exec = Runtime.getRuntime().exec(new String[] {System.getenv().get("ACCUMULO_HOME") + "/bin/start-all.sh"});
+ exec.waitFor();
+ while (true) {
+ try {
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance()));
+ Client client = MasterClient.getConnection(context);
+ MasterMonitorInfo masterStats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+ if (!masterStats.tServerInfo.isEmpty())
+ break;
+ } catch (Exception ex) {
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java
new file mode 100644
index 0000000..8210dc4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/concurrent/StopTabletServer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.concurrent;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+public class StopTabletServer extends Test {
+
+ Set<TServerInstance> getTServers(Instance instance) throws KeeperException, InterruptedException {
+ Set<TServerInstance> result = new HashSet<>();
+ ZooReader rdr = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+ String base = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+ for (String child : rdr.getChildren(base)) {
+ try {
+ List<String> children = rdr.getChildren(base + "/" + child);
+ if (children.size() > 0) {
+ Collections.sort(children);
+ Stat stat = new Stat();
+ byte[] data = rdr.getData(base + "/" + child + "/" + children.get(0), stat);
+ if (!"master".equals(new String(data, UTF_8))) {
+ result.add(new TServerInstance(AddressUtil.parseAddress(child, false), stat.getEphemeralOwner()));
+ }
+ }
+ } catch (KeeperException.NoNodeException ex) {
+ // someone beat us too it
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ Instance instance = env.getInstance();
+
+ List<TServerInstance> currentServers = new ArrayList<>(getTServers(instance));
+ Collections.shuffle(currentServers);
+ Runtime runtime = Runtime.getRuntime();
+ if (currentServers.size() > 1) {
+ TServerInstance victim = currentServers.get(0);
+ log.info("Stopping " + victim.hostPort());
+ Process exec = runtime.exec(new String[] {System.getenv("ACCUMULO_HOME") + "/bin/accumulo", "admin", "stop", victim.hostPort()});
+ if (exec.waitFor() != 0)
+ throw new RuntimeException("admin stop returned a non-zero response: " + exec.exitValue());
+ Set<TServerInstance> set = getTServers(instance);
+ if (set.contains(victim))
+ throw new RuntimeException("Failed to stop " + victim);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java
new file mode 100644
index 0000000..b0aa7e1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Compact.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class Compact extends Test {
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String table = state.getString("tableName");
+ Random rand = (Random) state.get("rand");
+ Connector conn = env.getConnector();
+ Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))));
+ Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))));
+
+ if (row1.compareTo(row2) >= 0) {
+ row1 = null;
+ row2 = null;
+ }
+
+ log.debug("compacting " + row1 + " " + row2);
+ conn.tableOperations().compact(table, row1, row2, rand.nextBoolean(), rand.nextBoolean());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java
new file mode 100644
index 0000000..2c5448d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Flush.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class Flush extends Test {
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String table = state.getString("tableName");
+ Random rand = (Random) state.get("rand");
+ Connector conn = env.getConnector();
+ Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))));
+ Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))));
+
+ if (row1.compareTo(row2) >= 0) {
+ row1 = null;
+ row2 = null;
+ }
+
+ log.debug("flushing " + row1 + " " + row2);
+ conn.tableOperations().flush(table, row1, row2, rand.nextBoolean());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java
new file mode 100644
index 0000000..50a1e52
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Init.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class Init extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ int numBanks = (Integer) state.get("numBanks");
+ int numAccts = (Integer) state.get("numAccts");
+
+ // add some splits to spread ingest out a little
+ TreeSet<Text> splits = new TreeSet<>();
+ for (int i = 1; i < 10; i++)
+ splits.add(new Text(Utils.getBank((int) (numBanks * .1 * i))));
+ env.getConnector().tableOperations().addSplits((String) state.get("tableName"), splits);
+ log.debug("Added splits " + splits);
+
+ ArrayList<Integer> banks = new ArrayList<>();
+ for (int i = 0; i < numBanks; i++)
+ banks.add(i);
+ // shuffle for case when multiple threads are adding banks
+ Collections.shuffle(banks, (Random) state.get("rand"));
+
+ ConditionalWriter cw = (ConditionalWriter) state.get("cw");
+
+ for (int i : banks) {
+ ConditionalMutation m = new ConditionalMutation(Utils.getBank(i));
+ int acceptedCount = 0;
+ for (int j = 0; j < numAccts; j++) {
+ String cf = Utils.getAccount(j);
+ m.addCondition(new Condition(cf, "seq"));
+ m.put(cf, "bal", "100");
+ m.put(cf, "seq", Utils.getSeq(0));
+
+ if (j % 1000 == 0 && j > 0) {
+ Status status = cw.write(m).getStatus();
+
+ while (status == Status.UNKNOWN)
+ status = cw.write(m).getStatus();
+
+ if (status == Status.ACCEPTED)
+ acceptedCount++;
+ m = new ConditionalMutation(Utils.getBank(i));
+ }
+
+ }
+ if (m.getConditions().size() > 0) {
+ Status status = cw.write(m).getStatus();
+ while (status == Status.UNKNOWN)
+ status = cw.write(m).getStatus();
+
+ if (status == Status.ACCEPTED)
+ acceptedCount++;
+ }
+
+ log.debug("Added bank " + Utils.getBank(i) + " " + acceptedCount);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java
new file mode 100644
index 0000000..2f5d52b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Merge.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class Merge extends Test {
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String table = state.getString("tableName");
+ Random rand = (Random) state.get("rand");
+ Connector conn = env.getConnector();
+ Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))));
+ Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))));
+
+ if (row1.compareTo(row2) >= 0) {
+ row1 = null;
+ row2 = null;
+ }
+
+ log.debug("merging " + row1 + " " + row2);
+ conn.tableOperations().merge(table, row1, row2);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java
new file mode 100644
index 0000000..1e4ad01
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Setup.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class Setup extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Random rand = new Random();
+ state.set("rand", rand);
+
+ int numBanks = Integer.parseInt(props.getProperty("numBanks", "1000"));
+ log.debug("numBanks = " + numBanks);
+ state.set("numBanks", numBanks);
+
+ int numAccts = Integer.parseInt(props.getProperty("numAccts", "10000"));
+ log.debug("numAccts = " + numAccts);
+ state.set("numAccts", numAccts);
+
+ String tableName = "banks";
+ state.set("tableName", tableName);
+
+ try {
+ env.getConnector().tableOperations().create(tableName);
+ log.debug("created table " + tableName);
+ boolean blockCache = rand.nextBoolean();
+ env.getConnector().tableOperations().setProperty(tableName, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), blockCache + "");
+ log.debug("set " + Property.TABLE_BLOCKCACHE_ENABLED.getKey() + " " + blockCache);
+ } catch (TableExistsException tee) {}
+
+ ConditionalWriter cw = env.getConnector().createConditionalWriter(tableName, new ConditionalWriterConfig().setMaxWriteThreads(1));
+ state.set("cw", cw);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java
new file mode 100644
index 0000000..8ea9aab
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Split.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class Split extends Test {
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String table = state.getString("tableName");
+ Random rand = (Random) state.get("rand");
+ Connector conn = env.getConnector();
+ String row = Utils.getBank(rand.nextInt((Integer) state.get("numBanks")));
+
+ log.debug("adding split " + row);
+ conn.tableOperations().addSplits(table, new TreeSet<>(Arrays.asList(new Text(row))));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java
new file mode 100644
index 0000000..cf72607
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/TearDown.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+/**
+ *
+ */
+public class TearDown extends Test {
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ ConditionalWriter cw = (ConditionalWriter) state.get("cw");
+ cw.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java
new file mode 100644
index 0000000..73a7d91
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Transfer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class Transfer extends Test {
+
+ private static class Account {
+ int seq;
+ int bal;
+
+ void setBal(String s) {
+ bal = Integer.parseInt(s);
+ }
+
+ void setSeq(String s) {
+ seq = Integer.parseInt(s);
+ }
+
+ @Override
+ public String toString() {
+ return seq + " " + bal;
+ }
+ }
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String table = state.getString("tableName");
+ Random rand = (Random) state.get("rand");
+ Connector conn = env.getConnector();
+
+ int numAccts = (Integer) state.get("numAccts");
+ // note: non integer exponents are slow
+
+ ZipfDistribution zdiBanks = new ZipfDistribution((Integer) state.get("numBanks"), 1);
+ String bank = Utils.getBank(zdiBanks.inverseCumulativeProbability(rand.nextDouble()));
+ ZipfDistribution zdiAccts = new ZipfDistribution(numAccts, 1);
+ String acct1 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble()));
+ String acct2 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble()));
+ while (acct2.equals(acct1)) {
+ // intentionally not using zipf distribution to pick on retry
+ acct2 = Utils.getAccount(rand.nextInt(numAccts));
+ }
+
+ // TODO document how data should be read when using ConditionalWriter
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) {
+
+ scanner.setRange(new Range(bank));
+ scanner.fetchColumnFamily(new Text(acct1));
+ scanner.fetchColumnFamily(new Text(acct2));
+
+ Account a1 = new Account();
+ Account a2 = new Account();
+ Account a;
+
+ for (Entry<Key,Value> entry : scanner) {
+ String cf = entry.getKey().getColumnFamilyData().toString();
+ String cq = entry.getKey().getColumnQualifierData().toString();
+
+ if (cf.equals(acct1))
+ a = a1;
+ else if (cf.equals(acct2))
+ a = a2;
+ else
+ throw new Exception("Unexpected column fam: " + cf);
+
+ if (cq.equals("bal"))
+ a.setBal(entry.getValue().toString());
+ else if (cq.equals("seq"))
+ a.setSeq(entry.getValue().toString());
+ else
+ throw new Exception("Unexpected column qual: " + cq);
+ }
+
+ int amt = rand.nextInt(50);
+
+ log.debug("transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2);
+
+ if (a1.bal >= amt) {
+ ConditionalMutation cm = new ConditionalMutation(bank, new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)),
+ new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq)));
+ cm.put(acct1, "bal", (a1.bal - amt) + "");
+ cm.put(acct2, "bal", (a2.bal + amt) + "");
+ cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1));
+ cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1));
+
+ ConditionalWriter cw = (ConditionalWriter) state.get("cw");
+ Status status = cw.write(cm).getStatus();
+ while (status == Status.UNKNOWN) {
+ log.debug("retrying transfer " + status);
+ status = cw.write(cm).getStatus();
+ }
+ log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java
new file mode 100644
index 0000000..5436c22
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Utils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+/**
+ *
+ */
+public class Utils {
+
+ static String getBank(int b) {
+ return String.format("b%03d", b);
+ }
+
+ static String getAccount(int a) {
+ return "acct" + String.format("%06d", a);
+ }
+
+ static String getSeq(int s) {
+ return String.format("%06d", s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java
new file mode 100644
index 0000000..fa516f1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/conditional/Verify.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.conditional;
+
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.ColumnSliceFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+/**
+ *
+ */
+public class Verify extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String table = state.getString("tableName");
+ Connector conn = env.getConnector();
+
+ int numAccts = (Integer) state.get("numAccts");
+
+ for (int i = 0; i < (Integer) state.get("numBanks"); i++)
+ verifyBank(table, conn, Utils.getBank(i), numAccts);
+
+ }
+
+ private void verifyBank(String table, Connector conn, String row, int numAccts) throws TableNotFoundException, Exception {
+ log.debug("Verifying bank " + row);
+
+ int count = 0;
+ int sum = 0;
+ int min = Integer.MAX_VALUE;
+ int max = Integer.MIN_VALUE;
+
+ // TODO do not use IsolatedScanner, just enable isolation on scanner
+ try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) {
+
+ scanner.setRange(new Range(row));
+ IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class);
+ ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true);
+ scanner.clearScanIterators();
+ scanner.addScanIterator(iterConf);
+
+ for (Entry<Key,Value> entry : scanner) {
+ int bal = Integer.parseInt(entry.getValue().toString());
+ sum += bal;
+ if (bal > max)
+ max = bal;
+ if (bal < min)
+ min = bal;
+ count++;
+ }
+
+ }
+
+ if (count > 0 && sum != numAccts * 100) {
+ throw new Exception("Sum is off " + sum);
+ }
+
+ log.debug("Verified " + row + " count = " + count + " sum = " + sum + " min = " + min + " max = " + max);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java
new file mode 100644
index 0000000..09774ff
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/Commit.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.image;
+
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class Commit extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ env.getMultiTableBatchWriter().flush();
+
+ log.debug("Committed " + state.getLong("numWrites") + " writes. Total writes: " + state.getLong("totalWrites"));
+ state.set("numWrites", Long.valueOf(0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java
new file mode 100644
index 0000000..687b2d1
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ImageFixture.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.image;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.Fixture;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.io.Text;
+
+public class ImageFixture extends Fixture {
+
+ String imageTableName;
+ String indexTableName;
+
+ @Override
+ public void setUp(State state, Environment env) throws Exception {
+
+ Connector conn = env.getConnector();
+ Instance instance = env.getInstance();
+
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 1; i < 256; i++) {
+ splits.add(new Text(String.format("%04x", i << 8)));
+ }
+
+ String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_");
+ String pid = env.getPid();
+
+ imageTableName = String.format("img_%s_%s_%d", hostname, pid, System.currentTimeMillis());
+ state.set("imageTableName", imageTableName);
+
+ indexTableName = String.format("img_ndx_%s_%s_%d", hostname, pid, System.currentTimeMillis());
+ state.set("indexTableName", indexTableName);
+
+ try {
+ conn.tableOperations().create(imageTableName);
+ conn.tableOperations().addSplits(imageTableName, splits);
+ log.debug("Created table " + imageTableName + " (id:" + Tables.getNameToIdMap(instance).get(imageTableName) + ")");
+ } catch (TableExistsException e) {
+ log.error("Table " + imageTableName + " already exists.");
+ throw e;
+ }
+
+ try {
+ conn.tableOperations().create(indexTableName);
+ log.debug("Created table " + indexTableName + " (id:" + Tables.getNameToIdMap(instance).get(indexTableName) + ")");
+ } catch (TableExistsException e) {
+ log.error("Table " + imageTableName + " already exists.");
+ throw e;
+ }
+
+ Random rand = new Random();
+ if (rand.nextInt(10) < 5) {
+ // setup locality groups
+ Map<String,Set<Text>> groups = getLocalityGroups();
+
+ conn.tableOperations().setLocalityGroups(imageTableName, groups);
+ log.debug("Configured locality groups for " + imageTableName + " groups = " + groups);
+ }
+
+ state.set("numWrites", Long.valueOf(0));
+ state.set("totalWrites", Long.valueOf(0));
+ state.set("verified", Integer.valueOf(0));
+ state.set("lastIndexRow", new Text(""));
+ }
+
+ static Map<String,Set<Text>> getLocalityGroups() {
+ Map<String,Set<Text>> groups = new HashMap<>();
+
+ HashSet<Text> lg1 = new HashSet<>();
+ lg1.add(Write.CONTENT_COLUMN_FAMILY);
+ groups.put("lg1", lg1);
+
+ HashSet<Text> lg2 = new HashSet<>();
+ lg2.add(Write.META_COLUMN_FAMILY);
+ groups.put("lg2", lg2);
+ return groups;
+ }
+
+ @Override
+ public void tearDown(State state, Environment env) throws Exception {
+ // We have resources we need to clean up
+ if (env.isMultiTableBatchWriterInitialized()) {
+ MultiTableBatchWriter mtbw = env.getMultiTableBatchWriter();
+ try {
+ mtbw.close();
+ } catch (MutationsRejectedException e) {
+ log.error("Ignoring mutations that weren't flushed", e);
+ }
+
+ // Reset the MTBW on the state to null
+ env.resetMultiTableBatchWriter();
+ }
+
+ // Now we can safely delete the tables
+ log.debug("Dropping tables: " + imageTableName + " " + indexTableName);
+
+ Connector conn = env.getConnector();
+
+ conn.tableOperations().delete(imageTableName);
+ conn.tableOperations().delete(indexTableName);
+
+ log.debug("Final total of writes: " + state.getLong("totalWrites"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java
new file mode 100644
index 0000000..dbd89e8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/ScanMeta.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.image;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+public class ScanMeta extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ // scan just the metadata of the images table to find N hashes... use the batch scanner to lookup those N hashes in the index table
+ // this scan will test locality groups....
+
+ String indexTableName = state.getString("indexTableName");
+ String imageTableName = state.getString("imageTableName");
+
+ String uuid = UUID.randomUUID().toString();
+
+ Connector conn = env.getConnector();
+
+ Scanner imageScanner = conn.createScanner(imageTableName, new Authorizations());
+
+ imageScanner.setRange(new Range(new Text(uuid), null));
+ imageScanner.fetchColumn(Write.META_COLUMN_FAMILY, Write.SHA1_COLUMN_QUALIFIER);
+
+ int minScan = Integer.parseInt(props.getProperty("minScan"));
+ int maxScan = Integer.parseInt(props.getProperty("maxScan"));
+
+ Random rand = new Random();
+ int numToScan = rand.nextInt(maxScan - minScan) + minScan;
+
+ Map<Text,Text> hashes = new HashMap<>();
+
+ Iterator<Entry<Key,Value>> iter = imageScanner.iterator();
+
+ while (iter.hasNext() && numToScan > 0) {
+
+ Entry<Key,Value> entry = iter.next();
+
+ hashes.put(new Text(entry.getValue().get()), entry.getKey().getRow());
+
+ numToScan--;
+ }
+
+ log.debug("Found " + hashes.size() + " hashes starting at " + uuid);
+
+ if (hashes.isEmpty()) {
+ return;
+ }
+
+ // use batch scanner to verify all of these exist in index
+ BatchScanner indexScanner = conn.createBatchScanner(indexTableName, Authorizations.EMPTY, 3);
+ ArrayList<Range> ranges = new ArrayList<>();
+ for (Text row : hashes.keySet()) {
+ ranges.add(new Range(row));
+ }
+
+ indexScanner.setRanges(ranges);
+
+ Map<Text,Text> hashes2 = new HashMap<>();
+
+ for (Entry<Key,Value> entry : indexScanner)
+ hashes2.put(entry.getKey().getRow(), new Text(entry.getValue().get()));
+
+ log.debug("Looked up " + ranges.size() + " ranges, found " + hashes2.size());
+
+ if (!hashes.equals(hashes2)) {
+ log.error("uuids from doc table : " + hashes.values());
+ log.error("uuids from index : " + hashes2.values());
+ throw new Exception("Mismatch between document table and index " + indexTableName + " " + imageTableName);
+ }
+
+ indexScanner.close();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java
new file mode 100644
index 0000000..1d14a90
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/image/TableOp.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.image;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+public class TableOp extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ // choose a table
+ Random rand = new Random();
+ String tableName;
+ if (rand.nextInt(10) < 8) {
+ tableName = state.getString("imageTableName");
+ } else {
+ tableName = state.getString("indexTableName");
+ }
+
+ // check if chosen table exists
+ Connector conn = env.getConnector();
+ TableOperations tableOps = conn.tableOperations();
+ if (tableOps.exists(tableName) == false) {
+ log.error("Table " + tableName + " does not exist!");
+ return;
+ }
+
+ // choose a random action
+ int num = rand.nextInt(10);
+ if (num > 6) {
+ log.debug("Retrieving info for " + tableName);
+ tableOps.getLocalityGroups(tableName);
+ tableOps.getProperties(tableName);
+ tableOps.listSplits(tableName);
+ tableOps.list();
+ } else {
+ log.debug("Clearing locator cache for " + tableName);
+ tableOps.clearLocatorCache(tableName);
+ }
+
+ if (rand.nextInt(10) < 3) {
+ Map<String,Set<Text>> groups = tableOps.getLocalityGroups(state.getString("imageTableName"));
+
+ if (groups.size() == 0) {
+ log.debug("Adding locality groups to " + state.getString("imageTableName"));
+ groups = ImageFixture.getLocalityGroups();
+ } else {
+ log.debug("Removing locality groups from " + state.getString("imageTableName"));
+ groups = new HashMap<>();
+ }
+
+ tableOps.setLocalityGroups(state.getString("imageTableName"), groups);
+ }
+ }
+}