You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mw...@apache.org on 2017/01/03 20:55:52 UTC
[3/7] accumulo-testing git commit: ACCUMULO-4510 Adding Randomwalk
code from Accumulo
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SecurityHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SecurityHelper.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SecurityHelper.java
new file mode 100644
index 0000000..93c7f02
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SecurityHelper.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.security;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SecurityHelper {
+ private static final Logger log = LoggerFactory.getLogger(SecurityHelper.class);
+
+ private static final String tableName = "secTableName";
+ private static final String masterName = "sysUserName";
+ private static final String tUserName = "tabUserName";
+
+ private static final String masterPass = "sysUserPass";
+ private static final String tUserPass = "tabUserPass";
+
+ private static final String tUserExists = "tabUserExists";
+ private static final String tableExists = "secTableExists";
+
+ private static final String masterConn = "sysUserConn";
+
+ private static final String authsMap = "authorizationsCountMap";
+ private static final String lastKey = "lastMutationKey";
+ private static final String filesystem = "securityFileSystem";
+
+ public static String getTableName(State state) {
+ return state.getString(tableName);
+ }
+
+ public static void setTableName(State state, String tName) {
+ state.set(tableName, tName);
+ }
+
+ public static String getSysUserName(State state) {
+ return state.getString(masterName);
+ }
+
+ public static void setSysUserName(State state, String sysUserName) {
+ state.set(masterName, sysUserName);
+ }
+
+ public static String getTabUserName(State state) {
+ return state.getString(tUserName);
+ }
+
+ public static void setTabUserName(State state, String tabUserName) {
+ state.set(tUserName, tabUserName);
+ }
+
+ public static byte[] getSysUserPass(State state) {
+ return (byte[]) state.get(masterPass);
+ }
+
+ public static void setSysUserPass(State state, byte[] sysUserPass) {
+ log.debug("Setting system user pass to " + new String(sysUserPass, UTF_8));
+ state.set(masterPass, sysUserPass);
+ state.set(masterPass + "time", System.currentTimeMillis());
+
+ }
+
+ public static boolean sysUserPassTransient(State state) {
+ return System.currentTimeMillis() - state.getLong(masterPass + "time") < 1000;
+ }
+
+ public static byte[] getTabUserPass(State state) {
+ return (byte[]) state.get(tUserPass);
+ }
+
+ public static void setTabUserPass(State state, byte[] tabUserPass) {
+ log.debug("Setting table user pass to " + new String(tabUserPass, UTF_8));
+ state.set(tUserPass, tabUserPass);
+ state.set(tUserPass + "time", System.currentTimeMillis());
+ }
+
+ public static boolean tabUserPassTransient(State state) {
+ return System.currentTimeMillis() - state.getLong(tUserPass + "time") < 1000;
+ }
+
+ public static boolean getTabUserExists(State state) {
+ return Boolean.parseBoolean(state.getString(tUserExists));
+ }
+
+ public static void setTabUserExists(State state, boolean exists) {
+ state.set(tUserExists, Boolean.toString(exists));
+ }
+
+ public static boolean getTableExists(State state) {
+ return Boolean.parseBoolean(state.getString(tableExists));
+ }
+
+ public static void setTableExists(State state, boolean exists) {
+ state.set(tableExists, Boolean.toString(exists));
+ }
+
+ public static Connector getSystemConnector(State state) {
+ return (Connector) state.get(masterConn);
+ }
+
+ public static void setSystemConnector(State state, Connector conn) {
+ state.set(masterConn, conn);
+ }
+
+ public static boolean getTabPerm(State state, String userName, TablePermission tp) {
+ return Boolean.parseBoolean(state.getString("Tab" + userName + tp.name()));
+ }
+
+ public static void setTabPerm(State state, String userName, TablePermission tp, boolean value) {
+ log.debug((value ? "Gave" : "Took") + " the table permission " + tp.name() + (value ? " to" : " from") + " user " + userName);
+ state.set("Tab" + userName + tp.name(), Boolean.toString(value));
+ if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE))
+ state.set("Tab" + userName + tp.name() + "time", System.currentTimeMillis());
+
+ }
+
+ public static boolean getSysPerm(State state, String userName, SystemPermission tp) {
+ return Boolean.parseBoolean(state.getString("Sys" + userName + tp.name()));
+ }
+
+ public static void setSysPerm(State state, String userName, SystemPermission tp, boolean value) {
+ log.debug((value ? "Gave" : "Took") + " the system permission " + tp.name() + (value ? " to" : " from") + " user " + userName);
+ state.set("Sys" + userName + tp.name(), Boolean.toString(value));
+ }
+
+ public static Authorizations getUserAuths(State state, String target) {
+ return (Authorizations) state.get(target + "_auths");
+ }
+
+ public static void setUserAuths(State state, String target, Authorizations auths) {
+ state.set(target + "_auths", auths);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String,Integer> getAuthsMap(State state) {
+ return (Map<String,Integer>) state.get(authsMap);
+ }
+
+ public static void setAuthsMap(State state, Map<String,Integer> map) {
+ state.set(authsMap, map);
+ }
+
+ public static String[] getAuthsArray() {
+ return new String[] {"Fishsticks", "PotatoSkins", "Ribs", "Asparagus", "Paper", "Towels", "Lint", "Brush", "Celery"};
+ }
+
+ public static String getLastKey(State state) {
+ return state.getString(lastKey);
+ }
+
+ public static void setLastKey(State state, String key) {
+ state.set(lastKey, key);
+ }
+
+ public static void increaseAuthMap(State state, String s, int increment) {
+ Integer curVal = getAuthsMap(state).get(s);
+ if (curVal == null) {
+ curVal = Integer.valueOf(0);
+ getAuthsMap(state).put(s, curVal);
+ }
+ curVal += increment;
+ }
+
+ public static FileSystem getFs(State state) {
+ FileSystem fs = null;
+ try {
+ fs = (FileSystem) state.get(filesystem);
+ } catch (RuntimeException re) {}
+
+ if (fs == null) {
+ try {
+ fs = FileSystem.get(CachedConfiguration.getInstance());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ state.set(filesystem, fs);
+ }
+ return fs;
+ }
+
+ public static boolean inAmbiguousZone(State state, String userName, TablePermission tp) {
+ if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE)) {
+ Long setTime = (Long) state.get("Tab" + userName + tp.name() + "time");
+ if (System.currentTimeMillis() < (setTime + 1000))
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SetAuths.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SetAuths.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SetAuths.java
new file mode 100644
index 0000000..54ab69f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/SetAuths.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.security;
+
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class SetAuths extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ String authsString = props.getProperty("auths", "_random");
+
+ String targetUser = props.getProperty("system");
+ String target;
+ String authPrincipal;
+ AuthenticationToken authToken;
+ if ("table".equals(targetUser)) {
+ target = WalkingSecurity.get(state, env).getTabUserName();
+ authPrincipal = WalkingSecurity.get(state, env).getSysUserName();
+ authToken = WalkingSecurity.get(state, env).getSysToken();
+ } else {
+ target = WalkingSecurity.get(state, env).getSysUserName();
+ authPrincipal = env.getUserName();
+ authToken = env.getToken();
+ }
+ Connector conn = env.getInstance().getConnector(authPrincipal, authToken);
+
+ boolean exists = WalkingSecurity.get(state, env).userExists(target);
+ boolean hasPermission = WalkingSecurity.get(state, env).canChangeAuthorizations(new Credentials(authPrincipal, authToken).toThrift(env.getInstance()),
+ target);
+
+ Authorizations auths;
+ if (authsString.equals("_random")) {
+ String[] possibleAuths = WalkingSecurity.get(state, env).getAuthsArray();
+
+ Random r = new Random();
+ int i = r.nextInt(possibleAuths.length);
+ String[] authSet = new String[i];
+ int length = possibleAuths.length;
+ for (int j = 0; j < i; j++) {
+ int nextRand = r.nextInt(length);
+ authSet[j] = possibleAuths[nextRand];
+ length--;
+ possibleAuths[nextRand] = possibleAuths[length];
+ possibleAuths[length] = authSet[j];
+ }
+ auths = new Authorizations(authSet);
+ } else {
+ auths = new Authorizations(authsString.split(","));
+ }
+
+ try {
+ conn.securityOperations().changeUserAuthorizations(target, auths);
+ } catch (AccumuloSecurityException ae) {
+ switch (ae.getSecurityErrorCode()) {
+ case PERMISSION_DENIED:
+ if (hasPermission)
+ throw new AccumuloException("Got a security exception when I should have had permission.", ae);
+ else
+ return;
+ case USER_DOESNT_EXIST:
+ if (exists)
+ throw new AccumuloException("Got security exception when the user should have existed", ae);
+ else
+ return;
+ default:
+ throw new AccumuloException("Got unexpected exception", ae);
+ }
+ }
+ WalkingSecurity.get(state, env).changeAuthorizations(target, auths);
+ if (!hasPermission)
+ throw new AccumuloException("Didn't get Security Exception when we should have");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/TableOp.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/TableOp.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/TableOp.java
new file mode 100644
index 0000000..d3335c4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/TableOp.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.security;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.security.SecurityErrorCode;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class TableOp extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ Connector conn = env.getInstance().getConnector(WalkingSecurity.get(state, env).getTabUserName(), WalkingSecurity.get(state, env).getTabToken());
+
+ String action = props.getProperty("action", "_random");
+ TablePermission tp;
+ if ("_random".equalsIgnoreCase(action)) {
+ Random r = new Random();
+ tp = TablePermission.values()[r.nextInt(TablePermission.values().length)];
+ } else {
+ tp = TablePermission.valueOf(action);
+ }
+
+ boolean tableExists = WalkingSecurity.get(state, env).getTableExists();
+ String tableName = WalkingSecurity.get(state, env).getTableName();
+ String namespaceName = WalkingSecurity.get(state, env).getNamespaceName();
+
+ switch (tp) {
+ case READ: {
+ boolean canRead = WalkingSecurity.get(state, env).canScan(WalkingSecurity.get(state, env).getTabCredentials(), tableName, namespaceName);
+ Authorizations auths = WalkingSecurity.get(state, env).getUserAuthorizations(WalkingSecurity.get(state, env).getTabCredentials());
+ boolean ambiguousZone = WalkingSecurity.get(state, env).inAmbiguousZone(conn.whoami(), tp);
+ boolean ambiguousAuths = WalkingSecurity.get(state, env).ambiguousAuthorizations(conn.whoami());
+
+ Scanner scan = null;
+ try {
+ scan = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(conn.whoami()));
+ int seen = 0;
+ Iterator<Entry<Key,Value>> iter = scan.iterator();
+ while (iter.hasNext()) {
+ Entry<Key,Value> entry = iter.next();
+ Key k = entry.getKey();
+ seen++;
+ if (!auths.contains(k.getColumnVisibilityData()) && !ambiguousAuths)
+ throw new AccumuloException("Got data I should not be capable of seeing: " + k + " table " + tableName);
+ }
+ if (!canRead && !ambiguousZone)
+ throw new AccumuloException("Was able to read when I shouldn't have had the perm with connection user " + conn.whoami() + " table " + tableName);
+ for (Entry<String,Integer> entry : WalkingSecurity.get(state, env).getAuthsMap().entrySet()) {
+ if (auths.contains(entry.getKey().getBytes(UTF_8)))
+ seen = seen - entry.getValue();
+ }
+ if (seen != 0 && !ambiguousAuths)
+ throw new AccumuloException("Got mismatched amounts of data");
+ } catch (TableNotFoundException tnfe) {
+ if (tableExists)
+ throw new AccumuloException("Accumulo and test suite out of sync: table " + tableName, tnfe);
+ return;
+ } catch (AccumuloSecurityException ae) {
+ if (ae.getSecurityErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
+ if (canRead && !ambiguousZone)
+ throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, ae);
+ else
+ return;
+ }
+ if (ae.getSecurityErrorCode().equals(SecurityErrorCode.BAD_AUTHORIZATIONS)) {
+ if (ambiguousAuths)
+ return;
+ else
+ throw new AccumuloException("Mismatched authorizations! ", ae);
+ }
+ throw new AccumuloException("Unexpected exception!", ae);
+ } catch (RuntimeException re) {
+ if (re.getCause() instanceof AccumuloSecurityException
+ && ((AccumuloSecurityException) re.getCause()).getSecurityErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
+ if (canRead && !ambiguousZone)
+ throw new AccumuloException("Table read permission out of sync with Accumulo: table " + tableName, re.getCause());
+ else
+ return;
+ }
+ if (re.getCause() instanceof AccumuloSecurityException
+ && ((AccumuloSecurityException) re.getCause()).getSecurityErrorCode().equals(SecurityErrorCode.BAD_AUTHORIZATIONS)) {
+ if (ambiguousAuths)
+ return;
+ else
+ throw new AccumuloException("Mismatched authorizations! ", re.getCause());
+ }
+
+ throw new AccumuloException("Unexpected exception!", re);
+ } finally {
+ if (scan != null) {
+ scan.close();
+ scan = null;
+ }
+
+ }
+
+ break;
+ }
+ case WRITE:
+ boolean canWrite = WalkingSecurity.get(state, env).canWrite(WalkingSecurity.get(state, env).getTabCredentials(), tableName, namespaceName);
+ boolean ambiguousZone = WalkingSecurity.get(state, env).inAmbiguousZone(conn.whoami(), tp);
+
+ String key = WalkingSecurity.get(state, env).getLastKey() + "1";
+ Mutation m = new Mutation(new Text(key));
+ for (String s : WalkingSecurity.get(state, env).getAuthsArray()) {
+ m.put(new Text(), new Text(), new ColumnVisibility(s), new Value("value".getBytes(UTF_8)));
+ }
+ BatchWriter writer = null;
+ try {
+ try {
+ writer = conn.createBatchWriter(tableName, new BatchWriterConfig().setMaxMemory(9000l).setMaxWriteThreads(1));
+ } catch (TableNotFoundException tnfe) {
+ if (tableExists)
+ throw new AccumuloException("Table didn't exist when it should have: " + tableName);
+ return;
+ }
+ boolean works = true;
+ try {
+ writer.addMutation(m);
+ writer.close();
+ } catch (MutationsRejectedException mre) {
+ // Currently no method for detecting reason for mre. Waiting on ACCUMULO-670
+ // For now, just wait a second and go again if they can write!
+ if (!canWrite)
+ return;
+
+ if (ambiguousZone) {
+ Thread.sleep(1000);
+ try {
+ writer = conn.createBatchWriter(tableName, new BatchWriterConfig().setMaxWriteThreads(1));
+ writer.addMutation(m);
+ writer.close();
+ writer = null;
+ } catch (MutationsRejectedException mre2) {
+ throw new AccumuloException("Mutation exception!", mre2);
+ }
+ }
+ }
+ if (works)
+ for (String s : WalkingSecurity.get(state, env).getAuthsArray())
+ WalkingSecurity.get(state, env).increaseAuthMap(s, 1);
+ } finally {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ }
+ break;
+ case BULK_IMPORT:
+ key = WalkingSecurity.get(state, env).getLastKey() + "1";
+ SortedSet<Key> keys = new TreeSet<>();
+ for (String s : WalkingSecurity.get(state, env).getAuthsArray()) {
+ Key k = new Key(key, "", "", s);
+ keys.add(k);
+ }
+ Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
+ Path fail = new Path(dir.toString() + "_fail");
+ FileSystem fs = WalkingSecurity.get(state, env).getFs();
+ FileSKVWriter f = FileOperations.getInstance().newWriterBuilder().forFile(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf())
+ .withTableConfiguration(AccumuloConfiguration.getDefaultConfiguration()).build();
+ f.startDefaultLocalityGroup();
+ fs.mkdirs(fail);
+ for (Key k : keys)
+ f.append(k, new Value("Value".getBytes(UTF_8)));
+ f.close();
+ try {
+ conn.tableOperations().importDirectory(tableName, dir.toString(), fail.toString(), true);
+ } catch (TableNotFoundException tnfe) {
+ if (tableExists)
+ throw new AccumuloException("Table didn't exist when it should have: " + tableName);
+ return;
+ } catch (AccumuloSecurityException ae) {
+ if (ae.getSecurityErrorCode().equals(SecurityErrorCode.PERMISSION_DENIED)) {
+ if (WalkingSecurity.get(state, env).canBulkImport(WalkingSecurity.get(state, env).getTabCredentials(), tableName, namespaceName))
+ throw new AccumuloException("Bulk Import failed when it should have worked: " + tableName);
+ return;
+ } else if (ae.getSecurityErrorCode().equals(SecurityErrorCode.BAD_CREDENTIALS)) {
+ if (WalkingSecurity.get(state, env).userPassTransient(conn.whoami()))
+ return;
+ }
+ throw new AccumuloException("Unexpected exception!", ae);
+ }
+ for (String s : WalkingSecurity.get(state, env).getAuthsArray())
+ WalkingSecurity.get(state, env).increaseAuthMap(s, 1);
+ fs.delete(dir, true);
+ fs.delete(fail, true);
+
+ if (!WalkingSecurity.get(state, env).canBulkImport(WalkingSecurity.get(state, env).getTabCredentials(), tableName, namespaceName))
+ throw new AccumuloException("Bulk Import succeeded when it should have failed: " + dir + " table " + tableName);
+ break;
+ case ALTER_TABLE:
+ AlterTable.renameTable(conn, state, env, tableName, tableName + "plus",
+ WalkingSecurity.get(state, env).canAlterTable(WalkingSecurity.get(state, env).getTabCredentials(), tableName, namespaceName), tableExists);
+ break;
+
+ case GRANT:
+ props.setProperty("task", "grant");
+ props.setProperty("perm", "random");
+ props.setProperty("source", "table");
+ props.setProperty("target", "system");
+ AlterTablePerm.alter(state, env, props);
+ break;
+
+ case DROP_TABLE:
+ props.setProperty("source", "table");
+ DropTable.dropTable(state, env, props);
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/Validate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/Validate.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/Validate.java
new file mode 100644
index 0000000..c28f28d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/Validate.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.security;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.SecurityErrorCode;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.log4j.Logger;
+
+public class Validate extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ validate(state, env, log);
+ }
+
+ public static void validate(State state, Environment env, Logger log) throws Exception {
+ Connector conn = env.getConnector();
+
+ boolean tableExists = WalkingSecurity.get(state, env).getTableExists();
+ boolean cloudTableExists = conn.tableOperations().list().contains(WalkingSecurity.get(state, env).getTableName());
+ if (tableExists != cloudTableExists)
+ throw new AccumuloException("Table existance out of sync");
+
+ boolean tableUserExists = WalkingSecurity.get(state, env).userExists(WalkingSecurity.get(state, env).getTabUserName());
+ boolean cloudTableUserExists = conn.securityOperations().listLocalUsers().contains(WalkingSecurity.get(state, env).getTabUserName());
+ if (tableUserExists != cloudTableUserExists)
+ throw new AccumuloException("Table User existance out of sync");
+
+ Properties props = new Properties();
+ props.setProperty("target", "system");
+ Authenticate.authenticate(env.getUserName(), env.getToken(), state, env, props);
+ props.setProperty("target", "table");
+ Authenticate.authenticate(env.getUserName(), env.getToken(), state, env, props);
+
+ for (String user : new String[] {WalkingSecurity.get(state, env).getSysUserName(), WalkingSecurity.get(state, env).getTabUserName()}) {
+ for (SystemPermission sp : SystemPermission.values()) {
+ boolean hasSp = WalkingSecurity.get(state, env).hasSystemPermission(user, sp);
+ boolean accuHasSp;
+ try {
+ accuHasSp = conn.securityOperations().hasSystemPermission(user, sp);
+ log.debug("Just checked to see if user " + user + " has system perm " + sp.name() + " with answer " + accuHasSp);
+ } catch (AccumuloSecurityException ae) {
+ if (ae.getSecurityErrorCode().equals(SecurityErrorCode.USER_DOESNT_EXIST)) {
+ if (tableUserExists)
+ throw new AccumuloException("Got user DNE error when they should", ae);
+ else
+ continue;
+ } else
+ throw new AccumuloException("Unexpected exception!", ae);
+ }
+ if (hasSp != accuHasSp)
+ throw new AccumuloException(user + " existance out of sync for system perm " + sp + " hasSp/CloudhasSP " + hasSp + " " + accuHasSp);
+ }
+
+ for (TablePermission tp : TablePermission.values()) {
+ boolean hasTp = WalkingSecurity.get(state, env).hasTablePermission(user, WalkingSecurity.get(state, env).getTableName(), tp);
+ boolean accuHasTp;
+ try {
+ accuHasTp = conn.securityOperations().hasTablePermission(user, WalkingSecurity.get(state, env).getTableName(), tp);
+ log.debug("Just checked to see if user " + user + " has table perm " + tp.name() + " with answer " + accuHasTp);
+ } catch (AccumuloSecurityException ae) {
+ if (ae.getSecurityErrorCode().equals(SecurityErrorCode.USER_DOESNT_EXIST)) {
+ if (tableUserExists)
+ throw new AccumuloException("Got user DNE error when they should", ae);
+ else
+ continue;
+ } else if (ae.getSecurityErrorCode().equals(SecurityErrorCode.TABLE_DOESNT_EXIST)) {
+ if (tableExists)
+ throw new AccumuloException("Got table DNE when it should", ae);
+ else
+ continue;
+ } else
+ throw new AccumuloException("Unexpected exception!", ae);
+ }
+ if (hasTp != accuHasTp)
+ throw new AccumuloException(user + " existance out of sync for table perm " + tp + " hasTp/CloudhasTP " + hasTp + " " + accuHasTp);
+ }
+
+ }
+
+ Authorizations accuAuths;
+ Authorizations auths;
+ try {
+ auths = WalkingSecurity.get(state, env).getUserAuthorizations(WalkingSecurity.get(state, env).getTabCredentials());
+ accuAuths = conn.securityOperations().getUserAuthorizations(WalkingSecurity.get(state, env).getTabUserName());
+ } catch (ThriftSecurityException ae) {
+ if (ae.getCode() == org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode.USER_DOESNT_EXIST) {
+ if (tableUserExists)
+ throw new AccumuloException("Table user didn't exist when they should.", ae);
+ else
+ return;
+ }
+ throw new AccumuloException("Unexpected exception!", ae);
+ }
+ if (!auths.equals(accuAuths))
+ throw new AccumuloException("Table User authorizations out of sync");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/WalkingSecurity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/WalkingSecurity.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/WalkingSecurity.java
new file mode 100644
index 0000000..302d6ec
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/security/WalkingSecurity.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.security;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.NamespaceNotFoundException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.NamespacePermission;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.handler.Authenticator;
+import org.apache.accumulo.server.security.handler.Authorizor;
+import org.apache.accumulo.server.security.handler.PermissionHandler;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class WalkingSecurity extends SecurityOperation implements Authorizor, Authenticator, PermissionHandler {
+ State state = null;
+ Environment env = null;
+ private static final Logger log = LoggerFactory.getLogger(WalkingSecurity.class);
+
+ private static final String tableName = "SecurityTableName";
+ private static final String namespaceName = "SecurityNamespaceName";
+ private static final String userName = "UserName";
+
+ private static final String userPass = "UserPass";
+ private static final String userExists = "UserExists";
+ private static final String tableExists = "TableExists";
+ private static final String namespaceExists = "NamespaceExists";
+
+ private static final String connector = "UserConnection";
+
+ private static final String authsMap = "authorizationsCountMap";
+ private static final String lastKey = "lastMutationKey";
+ private static final String filesystem = "securityFileSystem";
+
+ private static WalkingSecurity instance = null;
+
+ public WalkingSecurity(AccumuloServerContext context, Authorizor author, Authenticator authent, PermissionHandler pm) {
+ super(context, author, authent, pm);
+ }
+
+ public WalkingSecurity(State state2, Environment env2) {
+ super(new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())));
+ this.state = state2;
+ this.env = env2;
+ authorizor = this;
+ authenticator = this;
+ permHandle = this;
+ }
+
+ public static WalkingSecurity get(State state, Environment env) {
+ if (instance == null || instance.state != state) {
+ instance = new WalkingSecurity(state, env);
+ state.set(tableExists, Boolean.toString(false));
+ state.set(namespaceExists, Boolean.toString(false));
+ state.set(authsMap, new HashMap<String,Integer>());
+ }
+
+ return instance;
+ }
+
+ @Override
+ public void initialize(String instanceId, boolean initialize) {
+ throw new UnsupportedOperationException("nope");
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator one, PermissionHandler two) {
+ return this.getClass().equals(one.getClass()) && this.getClass().equals(two.getClass());
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authenticator one, Authorizor two) {
+ return this.getClass().equals(one.getClass()) && this.getClass().equals(two.getClass());
+ }
+
+ @Override
+ public boolean validSecurityHandlers(Authorizor one, PermissionHandler two) {
+ return this.getClass().equals(one.getClass()) && this.getClass().equals(two.getClass());
+ }
+
+ @Override
+ public void initializeSecurity(TCredentials rootuser, String token) throws ThriftSecurityException {
+ throw new UnsupportedOperationException("nope");
+ }
+
+ @Override
+ public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException {
+ state.set(user + "_auths", authorizations);
+ state.set("Auths-" + user + '-' + "time", System.currentTimeMillis());
+ }
+
+ @Override
+ public Authorizations getCachedUserAuthorizations(String user) throws AccumuloSecurityException {
+ return (Authorizations) state.get(user + "_auths");
+ }
+
+ public boolean ambiguousAuthorizations(String userName) {
+ Long setTime = state.getLong("Auths-" + userName + '-' + "time");
+ if (setTime == null)
+ throw new RuntimeException("WTF? Auths-" + userName + '-' + "time is null");
+ if (System.currentTimeMillis() < (setTime + 1000))
+ return true;
+ return false;
+ }
+
+ @Override
+ public void initUser(String user) throws AccumuloSecurityException {
+ changeAuthorizations(user, new Authorizations());
+ }
+
+ @Override
+ public Set<String> listUsers() throws AccumuloSecurityException {
+ Set<String> userList = new TreeSet<>();
+ for (String user : new String[] {getSysUserName(), getTabUserName()}) {
+ if (userExists(user))
+ userList.add(user);
+ }
+ return userList;
+ }
+
+ @Override
+ public boolean authenticateUser(String principal, AuthenticationToken token) {
+ PasswordToken pass = (PasswordToken) state.get(principal + userPass);
+ boolean ret = pass.equals(token);
+ return ret;
+ }
+
+ @Override
+ public void createUser(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ state.set(principal + userExists, Boolean.toString(true));
+ changePassword(principal, token);
+ cleanUser(principal);
+ }
+
+ @Override
+ public void dropUser(String user) throws AccumuloSecurityException {
+ state.set(user + userExists, Boolean.toString(false));
+ cleanUser(user);
+ if (user.equals(getTabUserName()))
+ state.set("table" + connector, null);
+ }
+
+ @Override
+ public void changePassword(String principal, AuthenticationToken token) throws AccumuloSecurityException {
+ state.set(principal + userPass, token);
+ state.set(principal + userPass + "time", System.currentTimeMillis());
+ }
+
+ @Override
+ public boolean userExists(String user) {
+ return Boolean.parseBoolean(state.getString(user + userExists));
+ }
+
+ @Override
+ public boolean hasSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ boolean res = Boolean.parseBoolean(state.getString("Sys-" + user + '-' + permission.name()));
+ return res;
+ }
+
+ @Override
+ public boolean hasCachedSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ return hasSystemPermission(user, permission);
+ }
+
+ @Override
+ public boolean hasTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return Boolean.parseBoolean(state.getString("Tab-" + user + '-' + permission.name()));
+ }
+
+ @Override
+ public boolean hasCachedTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ return hasTablePermission(user, table, permission);
+ }
+
+ @Override
+ public boolean hasNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ return Boolean.parseBoolean(state.getString("Nsp-" + user + '-' + permission.name()));
+ }
+
+ @Override
+ public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ return hasNamespacePermission(user, namespace, permission);
+ }
+
+ @Override
+ public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ setSysPerm(state, user, permission, true);
+ }
+
+ @Override
+ public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException {
+ setSysPerm(state, user, permission, false);
+ }
+
+ @Override
+ public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ setTabPerm(state, user, permission, table, true);
+ }
+
+ private static void setSysPerm(State state, String userName, SystemPermission tp, boolean value) {
+ log.debug((value ? "Gave" : "Took") + " the system permission " + tp.name() + (value ? " to" : " from") + " user " + userName);
+ state.set("Sys-" + userName + '-' + tp.name(), Boolean.toString(value));
+ }
+
+ private void setTabPerm(State state, String userName, TablePermission tp, String table, boolean value) {
+ if (table.equals(userName))
+ throw new RuntimeException("This is also fucked up");
+ log.debug((value ? "Gave" : "Took") + " the table permission " + tp.name() + (value ? " to" : " from") + " user " + userName);
+ state.set("Tab-" + userName + '-' + tp.name(), Boolean.toString(value));
+ if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE))
+ state.set("Tab-" + userName + '-' + tp.name() + '-' + "time", System.currentTimeMillis());
+ }
+
+ @Override
+ public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException, TableNotFoundException {
+ setTabPerm(state, user, permission, table, false);
+ }
+
+ @Override
+ public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ setNspPerm(state, user, permission, namespace, true);
+ }
+
+ private void setNspPerm(State state, String userName, NamespacePermission tnp, String namespace, boolean value) {
+ if (namespace.equals(userName))
+ throw new RuntimeException("I don't even know");
+ log.debug((value ? "Gave" : "Took") + " the table permission " + tnp.name() + (value ? " to" : " from") + " user " + userName);
+ state.set("Nsp-" + userName + '-' + tnp.name(), Boolean.toString(value));
+ if (tnp.equals(NamespacePermission.READ) || tnp.equals(NamespacePermission.WRITE))
+ state.set("Nsp-" + userName + '-' + tnp.name() + '-' + "time", System.currentTimeMillis());
+ }
+
+ @Override
+ public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException,
+ NamespaceNotFoundException {
+ setNspPerm(state, user, permission, namespace, false);
+ }
+
+ @Override
+ public void cleanTablePermissions(String table) throws AccumuloSecurityException, TableNotFoundException {
+ for (String user : new String[] {getSysUserName(), getTabUserName()}) {
+ for (TablePermission tp : TablePermission.values()) {
+ revokeTablePermission(user, table, tp);
+ }
+ }
+ state.set(tableExists, Boolean.toString(false));
+ }
+
+ @Override
+ public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException, NamespaceNotFoundException {
+ for (String user : new String[] {getSysUserName(), getNspUserName()}) {
+ for (NamespacePermission tnp : NamespacePermission.values()) {
+ revokeNamespacePermission(user, namespace, tnp);
+ }
+ }
+ state.set(namespaceExists, Boolean.toString(false));
+ }
+
+ @Override
+ public void cleanUser(String user) throws AccumuloSecurityException {
+ if (getTableExists())
+ for (TablePermission tp : TablePermission.values())
+ try {
+ revokeTablePermission(user, getTableName(), tp);
+ } catch (TableNotFoundException e) {
+ // ignore
+ }
+ for (SystemPermission sp : SystemPermission.values())
+ revokeSystemPermission(user, sp);
+ }
+
+ public String getTabUserName() {
+ return state.getString("table" + userName);
+ }
+
+ public String getSysUserName() {
+ return state.getString("system" + userName);
+ }
+
+ public String getNspUserName() {
+ return state.getString("namespace" + userName);
+ }
+
+ public void setTabUserName(String name) {
+ state.set("table" + userName, name);
+ state.set(name + userExists, Boolean.toString(false));
+ }
+
+ public void setNspUserName(String name) {
+ state.set("namespace" + userName, name);
+ state.set(name + userExists, Boolean.toString(false));
+ }
+
+ public void setSysUserName(String name) {
+ state.set("system" + userName, name);
+ }
+
+ public String getTableName() {
+ return state.getString(tableName);
+ }
+
+ public String getNamespaceName() {
+ return state.getString(namespaceName);
+ }
+
+ public boolean getTableExists() {
+ return Boolean.parseBoolean(state.getString(tableExists));
+ }
+
+ public boolean getNamespaceExists() {
+ return Boolean.parseBoolean(state.getString(namespaceExists));
+ }
+
+ public TCredentials getSysCredentials() {
+ return new Credentials(getSysUserName(), getSysToken()).toThrift(this.env.getInstance());
+ }
+
+ public TCredentials getTabCredentials() {
+ return new Credentials(getTabUserName(), getTabToken()).toThrift(this.env.getInstance());
+ }
+
+ public AuthenticationToken getSysToken() {
+ return new PasswordToken(getSysPassword());
+ }
+
+ public AuthenticationToken getTabToken() {
+ return new PasswordToken(getTabPassword());
+ }
+
+ public byte[] getUserPassword(String user) {
+ Object obj = state.get(user + userPass);
+ if (obj instanceof PasswordToken) {
+ return ((PasswordToken) obj).getPassword();
+ }
+ return null;
+ }
+
+ public byte[] getSysPassword() {
+ Object obj = state.get(getSysUserName() + userPass);
+ if (obj instanceof PasswordToken) {
+ return ((PasswordToken) obj).getPassword();
+ }
+ return null;
+ }
+
+ public byte[] getTabPassword() {
+ Object obj = state.get(getTabUserName() + userPass);
+ if (obj instanceof PasswordToken) {
+ return ((PasswordToken) obj).getPassword();
+ }
+ return null;
+ }
+
+ public boolean userPassTransient(String user) {
+ return System.currentTimeMillis() - state.getLong(user + userPass + "time") < 1000;
+ }
+
+ public void setTableName(String tName) {
+ state.set(tableName, tName);
+ }
+
+ public void setNamespaceName(String nsName) {
+ state.set(namespaceName, nsName);
+ }
+
+ @Override
+ public void initTable(String table) throws AccumuloSecurityException {
+ state.set(tableExists, Boolean.toString(true));
+ state.set(tableName, table);
+ }
+
+ public String[] getAuthsArray() {
+ return new String[] {"Fishsticks", "PotatoSkins", "Ribs", "Asparagus", "Paper", "Towels", "Lint", "Brush", "Celery"};
+ }
+
+ public boolean inAmbiguousZone(String userName, TablePermission tp) {
+ if (tp.equals(TablePermission.READ) || tp.equals(TablePermission.WRITE)) {
+ Long setTime = state.getLong("Tab-" + userName + '-' + tp.name() + '-' + "time");
+ if (setTime == null)
+ throw new RuntimeException("WTF? Tab-" + userName + '-' + tp.name() + '-' + "time is null");
+ if (System.currentTimeMillis() < (setTime + 1000))
+ return true;
+ }
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String,Integer> getAuthsMap() {
+ return (Map<String,Integer>) state.get(authsMap);
+ }
+
+ public String getLastKey() {
+ return state.getString(lastKey);
+ }
+
+ public void increaseAuthMap(String s, int increment) {
+ Integer curVal = getAuthsMap().get(s);
+ if (curVal == null) {
+ curVal = Integer.valueOf(0);
+ getAuthsMap().put(s, curVal);
+ }
+ curVal += increment;
+ }
+
+ public FileSystem getFs() {
+ FileSystem fs = null;
+ try {
+ fs = (FileSystem) state.get(filesystem);
+ } catch (RuntimeException re) {}
+
+ if (fs == null) {
+ try {
+ fs = FileSystem.get(CachedConfiguration.getInstance());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ state.set(filesystem, fs);
+ }
+ return fs;
+ }
+
+ @Override
+ public boolean canAskAboutUser(TCredentials credentials, String user) throws ThriftSecurityException {
+ try {
+ return super.canAskAboutUser(credentials, user);
+ } catch (ThriftSecurityException tse) {
+ if (tse.getCode().equals(SecurityErrorCode.PERMISSION_DENIED))
+ return false;
+ throw tse;
+ }
+ }
+
+ @Override
+ public boolean validTokenClass(String tokenClass) {
+ return tokenClass.equals(PasswordToken.class.getName());
+ }
+
+ public static void clearInstance() {
+ instance = null;
+ }
+
+ @Override
+ public Set<Class<? extends AuthenticationToken>> getSupportedTokenTypes() {
+ Set<Class<? extends AuthenticationToken>> cs = new HashSet<>();
+ cs.add(PasswordToken.class);
+ return cs;
+ }
+
+ @Override
+ public boolean isValidAuthorizations(String user, List<ByteBuffer> auths) throws AccumuloSecurityException {
+ Collection<ByteBuffer> userauths = getCachedUserAuthorizations(user).getAuthorizationsBB();
+ for (ByteBuffer auth : auths)
+ if (!userauths.contains(auth))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/BatchVerify.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/BatchVerify.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/BatchVerify.java
new file mode 100644
index 0000000..ec50285
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/BatchVerify.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.sequential;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+public class BatchVerify extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ Random rand = new Random();
+
+ long numWrites = state.getLong("numWrites");
+ int maxVerify = Integer.parseInt(props.getProperty("maxVerify", "2000"));
+ long numVerify = rand.nextInt(maxVerify - 1) + 1;
+
+ if (numVerify > (numWrites / 4)) {
+ numVerify = numWrites / 4;
+ }
+
+ Connector conn = env.getConnector();
+ BatchScanner scanner = conn.createBatchScanner(state.getString("seqTableName"), new Authorizations(), 2);
+
+ try {
+ int count = 0;
+ List<Range> ranges = new ArrayList<>();
+ while (count < numVerify) {
+ long rangeStart = rand.nextInt((int) numWrites);
+ long rangeEnd = rangeStart + 99;
+ if (rangeEnd > (numWrites - 1)) {
+ rangeEnd = numWrites - 1;
+ }
+ count += rangeEnd - rangeStart + 1;
+ ranges.add(new Range(new Text(String.format("%010d", rangeStart)), new Text(String.format("%010d", rangeEnd))));
+ }
+
+ ranges = Range.mergeOverlapping(ranges);
+ if (ranges.size() > 1) {
+ Collections.sort(ranges);
+ }
+
+ if (count == 0 || ranges.size() == 0)
+ return;
+
+ log.debug(String.format("scanning %d rows in the following %d ranges:", count, ranges.size()));
+ for (Range r : ranges) {
+ log.debug(r);
+ }
+
+ scanner.setRanges(ranges);
+
+ List<Key> keys = new ArrayList<>();
+ for (Entry<Key,Value> entry : scanner) {
+ keys.add(entry.getKey());
+ }
+
+ log.debug("scan returned " + keys.size() + " rows. now verifying...");
+
+ Collections.sort(keys);
+
+ Iterator<Key> iterator = keys.iterator();
+ int curKey = Integer.parseInt(iterator.next().getRow().toString());
+ boolean done = false;
+ for (Range r : ranges) {
+ int start = Integer.parseInt(r.getStartKey().getRow().toString());
+ int end = Integer.parseInt(String.copyValueOf(r.getEndKey().getRow().toString().toCharArray(), 0, 10));
+ for (int i = start; i <= end; i++) {
+
+ if (done) {
+ log.error("missing key " + i);
+ break;
+ }
+
+ while (curKey < i) {
+ log.error("extra key " + curKey);
+ if (iterator.hasNext() == false) {
+ done = true;
+ break;
+ }
+ curKey = Integer.parseInt(iterator.next().getRow().toString());
+ }
+
+ if (curKey > i) {
+ log.error("missing key " + i);
+ }
+
+ if (iterator.hasNext()) {
+ curKey = Integer.parseInt(iterator.next().getRow().toString());
+ } else {
+ done = true;
+ }
+ }
+ }
+
+ log.debug("verify is now complete");
+ } finally {
+ scanner.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Commit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Commit.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Commit.java
new file mode 100644
index 0000000..6865557
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Commit.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.sequential;
+
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class Commit extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ env.getMultiTableBatchWriter().flush();
+
+ log.debug("Committed " + state.getLong("numWrites") + " writes. Total writes: " + state.getLong("totalWrites"));
+ state.set("numWrites", Long.valueOf(0));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerify.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerify.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerify.java
new file mode 100644
index 0000000..58d44d4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerify.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.sequential;
+
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.util.ToolRunner;
+
+public class MapRedVerify extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ String[] args = new String[8];
+ args[0] = "-libjars";
+ args[1] = getMapReduceJars();
+ args[2] = env.getUserName();
+ args[3] = env.getPassword();
+ if (null == args[3]) {
+ args[3] = env.getKeytab();
+ }
+ args[4] = state.getString("seqTableName");
+ args[5] = env.getInstance().getInstanceName();
+ args[6] = env.getConfigProperty("ZOOKEEPERS");
+ args[7] = args[4] + "_MR";
+
+ if (ToolRunner.run(CachedConfiguration.getInstance(), new MapRedVerifyTool(), args) != 0) {
+ log.error("Failed to run map/red verify");
+ return;
+ }
+
+ Scanner outputScanner = env.getConnector().createScanner(args[7], Authorizations.EMPTY);
+ outputScanner.setRange(new Range());
+
+ int count = 0;
+ Key lastKey = null;
+ for (Entry<Key,Value> entry : outputScanner) {
+ Key current = entry.getKey();
+ if (lastKey != null && lastKey.getColumnFamily().equals(current.getRow())) {
+ log.info(entry.getKey());
+ count++;
+ }
+ lastKey = current;
+ }
+
+ if (count > 1) {
+ log.error("Gaps in output");
+ }
+
+ log.debug("Dropping table: " + args[7]);
+ Connector conn = env.getConnector();
+ conn.tableOperations().delete(args[7]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerifyTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerifyTool.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerifyTool.java
new file mode 100644
index 0000000..6f785ce
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/MapRedVerifyTool.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.sequential;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Logger;
+
+public class MapRedVerifyTool extends Configured implements Tool {
+ protected final Logger log = Logger.getLogger(this.getClass());
+
+ public static class SeqMapClass extends Mapper<Key,Value,NullWritable,IntWritable> {
+ @Override
+ public void map(Key row, Value data, Context output) throws IOException, InterruptedException {
+ Integer num = Integer.valueOf(row.getRow().toString());
+ output.write(NullWritable.get(), new IntWritable(num.intValue()));
+ }
+ }
+
+ public static class SeqReduceClass extends Reducer<NullWritable,IntWritable,Text,Mutation> {
+ @Override
+ public void reduce(NullWritable ignore, Iterable<IntWritable> values, Context output) throws IOException, InterruptedException {
+ Iterator<IntWritable> iterator = values.iterator();
+
+ if (iterator.hasNext() == false) {
+ return;
+ }
+
+ int start = iterator.next().get();
+ int index = start;
+ while (iterator.hasNext()) {
+ int next = iterator.next().get();
+ if (next != index + 1) {
+ writeMutation(output, start, index);
+ start = next;
+ }
+ index = next;
+ }
+ writeMutation(output, start, index);
+ }
+
+ public void writeMutation(Context output, int start, int end) throws IOException, InterruptedException {
+ Mutation m = new Mutation(new Text(String.format("%010d", start)));
+ m.put(new Text(String.format("%010d", end)), new Text(""), new Value(new byte[0]));
+ output.write(null, m);
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());
+ job.setJarByClass(this.getClass());
+
+ if (job.getJar() == null) {
+ log.error("M/R requires a jar file! Run mvn package.");
+ return 1;
+ }
+
+ ClientConfiguration clientConf = ClientConfiguration.loadDefault().withInstance(args[3]).withZkHosts(args[4]);
+
+ AccumuloInputFormat.setInputTableName(job, args[2]);
+ AccumuloInputFormat.setZooKeeperInstance(job, clientConf);
+ AccumuloOutputFormat.setDefaultTableName(job, args[5]);
+ AccumuloOutputFormat.setZooKeeperInstance(job, clientConf);
+
+ job.setInputFormatClass(AccumuloInputFormat.class);
+ if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ // Better be logged in
+ KerberosToken token = new KerberosToken();
+ try {
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+ if (!user.hasKerberosCredentials()) {
+ throw new IllegalStateException("Expected current user to have Kerberos credentials");
+ }
+
+ String newPrincipal = user.getUserName();
+
+ ZooKeeperInstance inst = new ZooKeeperInstance(clientConf);
+ Connector conn = inst.getConnector(newPrincipal, token);
+
+ // Do the explicit check to see if the user has the permission to get a delegation token
+ if (!conn.securityOperations().hasSystemPermission(conn.whoami(), SystemPermission.OBTAIN_DELEGATION_TOKEN)) {
+ log.error(newPrincipal + " doesn't have the " + SystemPermission.OBTAIN_DELEGATION_TOKEN.name()
+ + " SystemPermission neccesary to obtain a delegation token. MapReduce tasks cannot automatically use the client's"
+ + " credentials on remote servers. Delegation tokens provide a means to run MapReduce without distributing the user's credentials.");
+ throw new IllegalStateException(conn.whoami() + " does not have permission to obtain a delegation token");
+ }
+
+ // Fetch a delegation token from Accumulo
+ AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelegationTokenConfig());
+
+ // Set the delegation token instead of the kerberos token
+ AccumuloInputFormat.setConnectorInfo(job, newPrincipal, dt);
+ AccumuloOutputFormat.setConnectorInfo(job, newPrincipal, dt);
+ } catch (Exception e) {
+ final String msg = "Failed to acquire DelegationToken for use with MapReduce";
+ log.error(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ } else {
+ AccumuloInputFormat.setConnectorInfo(job, args[0], new PasswordToken(args[1]));
+ AccumuloOutputFormat.setConnectorInfo(job, args[0], new PasswordToken(args[1]));
+ }
+
+ job.setMapperClass(SeqMapClass.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(IntWritable.class);
+
+ job.setReducerClass(SeqReduceClass.class);
+ job.setNumReduceTasks(1);
+
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+ AccumuloOutputFormat.setCreateTables(job, true);
+
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/SequentialFixture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/SequentialFixture.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/SequentialFixture.java
new file mode 100644
index 0000000..de8af18
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/SequentialFixture.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.sequential;
+
+import java.net.InetAddress;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.Fixture;
+import org.apache.accumulo.testing.core.randomwalk.State;
+
+public class SequentialFixture extends Fixture {
+
+ String seqTableName;
+
+ @Override
+ public void setUp(State state, Environment env) throws Exception {
+
+ Connector conn = env.getConnector();
+ Instance instance = env.getInstance();
+
+ String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_");
+
+ seqTableName = String.format("sequential_%s_%s_%d", hostname, env.getPid(), System.currentTimeMillis());
+ state.set("seqTableName", seqTableName);
+
+ try {
+ conn.tableOperations().create(seqTableName);
+ log.debug("Created table " + seqTableName + " (id:" + Tables.getNameToIdMap(instance).get(seqTableName) + ")");
+ } catch (TableExistsException e) {
+ log.warn("Table " + seqTableName + " already exists!");
+ throw e;
+ }
+ conn.tableOperations().setProperty(seqTableName, "table.scan.max.memory", "1K");
+
+ state.set("numWrites", Long.valueOf(0));
+ state.set("totalWrites", Long.valueOf(0));
+ }
+
+ @Override
+ public void tearDown(State state, Environment env) throws Exception {
+ // We have resources we need to clean up
+ if (env.isMultiTableBatchWriterInitialized()) {
+ MultiTableBatchWriter mtbw = env.getMultiTableBatchWriter();
+ try {
+ mtbw.close();
+ } catch (MutationsRejectedException e) {
+ log.error("Ignoring mutations that weren't flushed", e);
+ }
+
+ // Reset the MTBW on the state to null
+ env.resetMultiTableBatchWriter();
+ }
+
+ log.debug("Dropping tables: " + seqTableName);
+
+ Connector conn = env.getConnector();
+
+ conn.tableOperations().delete(seqTableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Write.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Write.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Write.java
new file mode 100644
index 0000000..80e0e38
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/sequential/Write.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.sequential;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.io.Text;
+
+public class Write extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ BatchWriter bw = env.getMultiTableBatchWriter().getBatchWriter(state.getString("seqTableName"));
+
+ state.set("numWrites", state.getLong("numWrites") + 1);
+
+ Long totalWrites = state.getLong("totalWrites") + 1;
+ if (totalWrites % 10000 == 0) {
+ log.debug("Total writes: " + totalWrites);
+ }
+ state.set("totalWrites", totalWrites);
+
+ Mutation m = new Mutation(new Text(String.format("%010d", totalWrites)));
+ m.put(new Text("cf"), new Text("cq"), new Value("val".getBytes(UTF_8)));
+ bw.addMutation(m);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/BulkInsert.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/BulkInsert.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/BulkInsert.java
new file mode 100644
index 0000000..86afd8f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/BulkInsert.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.shard;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+
+public class BulkInsert extends Test {
+
+ class SeqfileBatchWriter implements BatchWriter {
+
+ SequenceFile.Writer writer;
+
+ SeqfileBatchWriter(Configuration conf, FileSystem fs, String file) throws IOException {
+ writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(fs.makeQualified(new Path(file))), SequenceFile.Writer.keyClass(Key.class),
+ SequenceFile.Writer.valueClass(Value.class));
+ }
+
+ @Override
+ public void addMutation(Mutation m) throws MutationsRejectedException {
+ List<ColumnUpdate> updates = m.getUpdates();
+ for (ColumnUpdate cu : updates) {
+ Key key = new Key(m.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), cu.getColumnVisibility(), Long.MAX_VALUE, false, false);
+ Value val = new Value(cu.getValue(), false);
+
+ try {
+ writer.append(key, val);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
+ for (Mutation mutation : iterable)
+ addMutation(mutation);
+ }
+
+ @Override
+ public void flush() throws MutationsRejectedException {}
+
+ @Override
+ public void close() throws MutationsRejectedException {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ String indexTableName = (String) state.get("indexTableName");
+ String dataTableName = (String) state.get("docTableName");
+ int numPartitions = (Integer) state.get("numPartitions");
+ Random rand = (Random) state.get("rand");
+ long nextDocID = (Long) state.get("nextDocID");
+
+ int minInsert = Integer.parseInt(props.getProperty("minInsert"));
+ int maxInsert = Integer.parseInt(props.getProperty("maxInsert"));
+ int numToInsert = rand.nextInt(maxInsert - minInsert) + minInsert;
+
+ int maxSplits = Integer.parseInt(props.getProperty("maxSplits"));
+
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = FileSystem.get(conf);
+
+ String rootDir = "/tmp/shard_bulk/" + dataTableName;
+
+ fs.mkdirs(new Path(rootDir));
+
+ BatchWriter dataWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/data.seq");
+ BatchWriter indexWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/index.seq");
+
+ for (int i = 0; i < numToInsert; i++) {
+ String docID = Insert.insertRandomDocument(nextDocID++, dataWriter, indexWriter, indexTableName, dataTableName, numPartitions, rand);
+ log.debug("Bulk inserting document " + docID);
+ }
+
+ state.set("nextDocID", Long.valueOf(nextDocID));
+
+ dataWriter.close();
+ indexWriter.close();
+
+ sort(state, env, fs, dataTableName, rootDir + "/data.seq", rootDir + "/data_bulk", rootDir + "/data_work", maxSplits);
+ sort(state, env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk", rootDir + "/index_work", maxSplits);
+
+ bulkImport(fs, state, env, dataTableName, rootDir, "data");
+ bulkImport(fs, state, env, indexTableName, rootDir, "index");
+
+ fs.delete(new Path(rootDir), true);
+ }
+
+ private void bulkImport(FileSystem fs, State state, Environment env, String tableName, String rootDir, String prefix) throws Exception {
+ while (true) {
+ String bulkDir = rootDir + "/" + prefix + "_bulk";
+ String failDir = rootDir + "/" + prefix + "_failure";
+ Path failPath = new Path(failDir);
+ fs.delete(failPath, true);
+ fs.mkdirs(failPath);
+ env.getConnector().tableOperations().importDirectory(tableName, bulkDir, failDir, true);
+
+ FileStatus[] failures = fs.listStatus(failPath);
+ if (failures != null && failures.length > 0) {
+ log.warn("Failed to bulk import some files, retrying ");
+
+ for (FileStatus failure : failures) {
+ if (!failure.getPath().getName().endsWith(".seq"))
+ fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName()));
+ else
+ log.debug("Ignoring " + failure.getPath());
+ }
+ sleepUninterruptibly(3, TimeUnit.SECONDS);
+ } else
+ break;
+ }
+ }
+
+ private void sort(State state, Environment env, FileSystem fs, String tableName, String seqFile, String outputDir, String workDir, int maxSplits)
+ throws Exception {
+
+ PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))), false, UTF_8.name());
+
+ Connector conn = env.getConnector();
+
+ Collection<Text> splits = conn.tableOperations().listSplits(tableName, maxSplits);
+ for (Text split : splits)
+ out.println(Base64.getEncoder().encodeToString(TextUtil.getBytes(split)));
+
+ out.close();
+
+ SortTool sortTool = new SortTool(seqFile, outputDir, workDir + "/splits.txt", splits);
+
+ String[] args = new String[2];
+ args[0] = "-libjars";
+ args[1] = getMapReduceJars();
+
+ if (ToolRunner.run(CachedConfiguration.getInstance(), sortTool, args) != 0) {
+ throw new Exception("Failed to run map/red verify");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/CloneIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/CloneIndex.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/CloneIndex.java
new file mode 100644
index 0000000..c47d2a8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/CloneIndex.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.shard;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class CloneIndex extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+
+ String indexTableName = (String) state.get("indexTableName");
+ String tmpIndexTableName = indexTableName + "_tmp";
+
+ long t1 = System.currentTimeMillis();
+ env.getConnector().tableOperations().flush(indexTableName, null, null, true);
+ long t2 = System.currentTimeMillis();
+ env.getConnector().tableOperations().clone(indexTableName, tmpIndexTableName, false, new HashMap<String,String>(), new HashSet<String>());
+ long t3 = System.currentTimeMillis();
+
+ log.debug("Cloned " + tmpIndexTableName + " from " + indexTableName + " flush: " + (t2 - t1) + "ms clone: " + (t3 - t2) + "ms");
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/ac5b271c/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/Commit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/Commit.java b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/Commit.java
new file mode 100644
index 0000000..06e8b44
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/randomwalk/shard/Commit.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.randomwalk.shard;
+
+import java.util.Properties;
+
+import org.apache.accumulo.testing.core.randomwalk.Environment;
+import org.apache.accumulo.testing.core.randomwalk.State;
+import org.apache.accumulo.testing.core.randomwalk.Test;
+
+public class Commit extends Test {
+
+ @Override
+ public void visit(State state, Environment env, Properties props) throws Exception {
+ env.getMultiTableBatchWriter().flush();
+ log.debug("Committed inserts ");
+ }
+
+}