You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/08/31 04:24:27 UTC
[04/10] accumulo git commit: Merge branch '1.7' into 1.8
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d28a3ee3/test/src/main/java/org/apache/accumulo/test/functional/PermissionsIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/PermissionsIT.java
index 2fc256b,0000000..c7fc709
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/PermissionsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/PermissionsIT.java
@@@ -1,707 -1,0 +1,710 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.security.SecurityErrorCode;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.SystemPermission;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
++import org.apache.accumulo.test.categories.MiniClusterOnlyTest;
+import org.apache.hadoop.io.Text;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
++import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// This test verifies the default permissions so a clean instance must be used. A shared instance might
+// not be representative of a fresh installation.
++@Category(MiniClusterOnlyTest.class)
+public class PermissionsIT extends AccumuloClusterHarness {
+ private static final Logger log = LoggerFactory.getLogger(PermissionsIT.class);
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 60;
+ }
+
+ @Before
+ public void limitToMini() throws Exception {
+ Assume.assumeTrue(ClusterType.MINI == getClusterType());
+ Connector c = getConnector();
+ Set<String> users = c.securityOperations().listLocalUsers();
+ ClusterUser user = getUser(0);
+ if (users.contains(user.getPrincipal())) {
+ c.securityOperations().dropLocalUser(user.getPrincipal());
+ }
+ }
+
+ private void loginAs(ClusterUser user) throws IOException {
+ // Force a re-login as the provided user
+ user.getToken();
+ }
+
+ @Test
+ public void systemPermissionsTest() throws Exception {
+ ClusterUser testUser = getUser(0), rootUser = getAdminUser();
+
+ // verify that the test is being run by root
+ Connector c = getConnector();
+ verifyHasOnlyTheseSystemPermissions(c, c.whoami(), SystemPermission.values());
+
+ // create the test user
+ String principal = testUser.getPrincipal();
+ AuthenticationToken token = testUser.getToken();
+ PasswordToken passwordToken = null;
+ if (token instanceof PasswordToken) {
+ passwordToken = (PasswordToken) token;
+ }
+ loginAs(rootUser);
+ c.securityOperations().createLocalUser(principal, passwordToken);
+ loginAs(testUser);
+ Connector test_user_conn = c.getInstance().getConnector(principal, token);
+ loginAs(rootUser);
+ verifyHasNoSystemPermissions(c, principal, SystemPermission.values());
+
+ // test each permission
+ for (SystemPermission perm : SystemPermission.values()) {
+ log.debug("Verifying the " + perm + " permission");
+
+ // test permission before and after granting it
+ String tableNamePrefix = getUniqueNames(1)[0];
+ testMissingSystemPermission(tableNamePrefix, c, rootUser, test_user_conn, testUser, perm);
+ loginAs(rootUser);
+ c.securityOperations().grantSystemPermission(principal, perm);
+ verifyHasOnlyTheseSystemPermissions(c, principal, perm);
+ testGrantedSystemPermission(tableNamePrefix, c, rootUser, test_user_conn, testUser, perm);
+ loginAs(rootUser);
+ c.securityOperations().revokeSystemPermission(principal, perm);
+ verifyHasNoSystemPermissions(c, principal, perm);
+ }
+ }
+
+ static Map<String,String> map(Iterable<Entry<String,String>> i) {
+ Map<String,String> result = new HashMap<>();
+ for (Entry<String,String> e : i) {
+ result.put(e.getKey(), e.getValue());
+ }
+ return result;
+ }
+
+ private void testMissingSystemPermission(String tableNamePrefix, Connector root_conn, ClusterUser rootUser, Connector test_user_conn, ClusterUser testUser,
+ SystemPermission perm) throws Exception {
+ String tableName, user, password = "password", namespace;
+ boolean passwordBased = testUser.getPassword() != null;
+ log.debug("Confirming that the lack of the " + perm + " permission properly restricts the user");
+
+ // test permission prior to granting it
+ switch (perm) {
+ case CREATE_TABLE:
+ tableName = tableNamePrefix + "__CREATE_TABLE_WITHOUT_PERM_TEST__";
+ try {
+ loginAs(testUser);
+ test_user_conn.tableOperations().create(tableName);
+ throw new IllegalStateException("Should NOT be able to create a table");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || root_conn.tableOperations().list().contains(tableName))
+ throw e;
+ }
+ break;
+ case DROP_TABLE:
+ tableName = tableNamePrefix + "__DROP_TABLE_WITHOUT_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.tableOperations().create(tableName);
+ try {
+ loginAs(testUser);
+ test_user_conn.tableOperations().delete(tableName);
+ throw new IllegalStateException("Should NOT be able to delete a table");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || !root_conn.tableOperations().list().contains(tableName))
+ throw e;
+ }
+ break;
+ case ALTER_TABLE:
+ tableName = tableNamePrefix + "__ALTER_TABLE_WITHOUT_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.tableOperations().create(tableName);
+ try {
+ loginAs(testUser);
+ test_user_conn.tableOperations().setProperty(tableName, Property.TABLE_BLOOM_ERRORRATE.getKey(), "003.14159%");
+ throw new IllegalStateException("Should NOT be able to set a table property");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED
+ || map(root_conn.tableOperations().getProperties(tableName)).get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw e;
+ }
+ loginAs(rootUser);
+ root_conn.tableOperations().setProperty(tableName, Property.TABLE_BLOOM_ERRORRATE.getKey(), "003.14159%");
+ try {
+ loginAs(testUser);
+ test_user_conn.tableOperations().removeProperty(tableName, Property.TABLE_BLOOM_ERRORRATE.getKey());
+ throw new IllegalStateException("Should NOT be able to remove a table property");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED
+ || !map(root_conn.tableOperations().getProperties(tableName)).get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw e;
+ }
+ String table2 = tableName + "2";
+ try {
+ loginAs(testUser);
+ test_user_conn.tableOperations().rename(tableName, table2);
+ throw new IllegalStateException("Should NOT be able to rename a table");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || !root_conn.tableOperations().list().contains(tableName)
+ || root_conn.tableOperations().list().contains(table2))
+ throw e;
+ }
+ break;
+ case CREATE_USER:
+ user = "__CREATE_USER_WITHOUT_PERM_TEST__";
+ try {
+ loginAs(testUser);
+ test_user_conn.securityOperations().createLocalUser(user, (passwordBased ? new PasswordToken(password) : null));
+ throw new IllegalStateException("Should NOT be able to create a user");
+ } catch (AccumuloSecurityException e) {
+ AuthenticationToken userToken = testUser.getToken();
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED
+ || (userToken instanceof PasswordToken && root_conn.securityOperations().authenticateUser(user, userToken)))
+ throw e;
+ }
+ break;
+ case DROP_USER:
+ user = "__DROP_USER_WITHOUT_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.securityOperations().createLocalUser(user, (passwordBased ? new PasswordToken(password) : null));
+ try {
+ loginAs(testUser);
+ test_user_conn.securityOperations().dropLocalUser(user);
+ throw new IllegalStateException("Should NOT be able to delete a user");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || !root_conn.securityOperations().listLocalUsers().contains(user)) {
+ log.info("Failed to authenticate as " + user);
+ throw e;
+ }
+ }
+ break;
+ case ALTER_USER:
+ user = "__ALTER_USER_WITHOUT_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.securityOperations().createLocalUser(user, (passwordBased ? new PasswordToken(password) : null));
+ try {
+ loginAs(testUser);
+ test_user_conn.securityOperations().changeUserAuthorizations(user, new Authorizations("A", "B"));
+ throw new IllegalStateException("Should NOT be able to alter a user");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || !root_conn.securityOperations().getUserAuthorizations(user).isEmpty())
+ throw e;
+ }
+ break;
+ case SYSTEM:
+ // test for system permission would go here
+ break;
+ case CREATE_NAMESPACE:
+ namespace = "__CREATE_NAMESPACE_WITHOUT_PERM_TEST__";
+ try {
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().create(namespace);
+ throw new IllegalStateException("Should NOT be able to create a namespace");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || root_conn.namespaceOperations().list().contains(namespace))
+ throw e;
+ }
+ break;
+ case DROP_NAMESPACE:
+ namespace = "__DROP_NAMESPACE_WITHOUT_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.namespaceOperations().create(namespace);
+ try {
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().delete(namespace);
+ throw new IllegalStateException("Should NOT be able to delete a namespace");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || !root_conn.namespaceOperations().list().contains(namespace))
+ throw e;
+ }
+ break;
+ case ALTER_NAMESPACE:
+ namespace = "__ALTER_NAMESPACE_WITHOUT_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.namespaceOperations().create(namespace);
+ try {
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().setProperty(namespace, Property.TABLE_BLOOM_ERRORRATE.getKey(), "003.14159%");
+ throw new IllegalStateException("Should NOT be able to set a namespace property");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED
+ || map(root_conn.namespaceOperations().getProperties(namespace)).get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw e;
+ }
+ loginAs(rootUser);
+ root_conn.namespaceOperations().setProperty(namespace, Property.TABLE_BLOOM_ERRORRATE.getKey(), "003.14159%");
+ try {
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().removeProperty(namespace, Property.TABLE_BLOOM_ERRORRATE.getKey());
+ throw new IllegalStateException("Should NOT be able to remove a namespace property");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED
+ || !map(root_conn.namespaceOperations().getProperties(namespace)).get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw e;
+ }
+ String namespace2 = namespace + "2";
+ try {
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().rename(namespace, namespace2);
+ throw new IllegalStateException("Should NOT be able to rename a namespace");
+ } catch (AccumuloSecurityException e) {
+ loginAs(rootUser);
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED || !root_conn.namespaceOperations().list().contains(namespace)
+ || root_conn.namespaceOperations().list().contains(namespace2))
+ throw e;
+ }
+ break;
+ case OBTAIN_DELEGATION_TOKEN:
+ ClientConfiguration clientConf = cluster.getClientConfig();
+ if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ // TODO Try to obtain a delegation token without the permission
+ }
+ break;
+ case GRANT:
+ loginAs(testUser);
+ try {
+ test_user_conn.securityOperations().grantSystemPermission(testUser.getPrincipal(), SystemPermission.GRANT);
+ throw new IllegalStateException("Should NOT be able to grant System.GRANT to yourself");
+ } catch (AccumuloSecurityException e) {
+ // Expected
+ loginAs(rootUser);
+ assertFalse(root_conn.securityOperations().hasSystemPermission(testUser.getPrincipal(), SystemPermission.GRANT));
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized System Permission: " + perm);
+ }
+ }
+
+ private void testGrantedSystemPermission(String tableNamePrefix, Connector root_conn, ClusterUser rootUser, Connector test_user_conn, ClusterUser testUser,
+ SystemPermission perm) throws Exception {
+ String tableName, user, password = "password", namespace;
+ boolean passwordBased = testUser.getPassword() != null;
+ log.debug("Confirming that the presence of the " + perm + " permission properly permits the user");
+
+ // test permission after granting it
+ switch (perm) {
+ case CREATE_TABLE:
+ tableName = tableNamePrefix + "__CREATE_TABLE_WITH_PERM_TEST__";
+ loginAs(testUser);
+ test_user_conn.tableOperations().create(tableName);
+ loginAs(rootUser);
+ if (!root_conn.tableOperations().list().contains(tableName))
+ throw new IllegalStateException("Should be able to create a table");
+ break;
+ case DROP_TABLE:
+ tableName = tableNamePrefix + "__DROP_TABLE_WITH_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.tableOperations().create(tableName);
+ loginAs(testUser);
+ test_user_conn.tableOperations().delete(tableName);
+ loginAs(rootUser);
+ if (root_conn.tableOperations().list().contains(tableName))
+ throw new IllegalStateException("Should be able to delete a table");
+ break;
+ case ALTER_TABLE:
+ tableName = tableNamePrefix + "__ALTER_TABLE_WITH_PERM_TEST__";
+ String table2 = tableName + "2";
+ loginAs(rootUser);
+ root_conn.tableOperations().create(tableName);
+ loginAs(testUser);
+ test_user_conn.tableOperations().setProperty(tableName, Property.TABLE_BLOOM_ERRORRATE.getKey(), "003.14159%");
+ loginAs(rootUser);
+ Map<String,String> properties = map(root_conn.tableOperations().getProperties(tableName));
+ if (!properties.get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw new IllegalStateException("Should be able to set a table property");
+ loginAs(testUser);
+ test_user_conn.tableOperations().removeProperty(tableName, Property.TABLE_BLOOM_ERRORRATE.getKey());
+ loginAs(rootUser);
+ properties = map(root_conn.tableOperations().getProperties(tableName));
+ if (properties.get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw new IllegalStateException("Should be able to remove a table property");
+ loginAs(testUser);
+ test_user_conn.tableOperations().rename(tableName, table2);
+ loginAs(rootUser);
+ if (root_conn.tableOperations().list().contains(tableName) || !root_conn.tableOperations().list().contains(table2))
+ throw new IllegalStateException("Should be able to rename a table");
+ break;
+ case CREATE_USER:
+ user = "__CREATE_USER_WITH_PERM_TEST__";
+ loginAs(testUser);
+ test_user_conn.securityOperations().createLocalUser(user, (passwordBased ? new PasswordToken(password) : null));
+ loginAs(rootUser);
+ if (passwordBased && !root_conn.securityOperations().authenticateUser(user, new PasswordToken(password)))
+ throw new IllegalStateException("Should be able to create a user");
+ break;
+ case DROP_USER:
+ user = "__DROP_USER_WITH_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.securityOperations().createLocalUser(user, (passwordBased ? new PasswordToken(password) : null));
+ loginAs(testUser);
+ test_user_conn.securityOperations().dropLocalUser(user);
+ loginAs(rootUser);
+ if (passwordBased && root_conn.securityOperations().authenticateUser(user, new PasswordToken(password)))
+ throw new IllegalStateException("Should be able to delete a user");
+ break;
+ case ALTER_USER:
+ user = "__ALTER_USER_WITH_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.securityOperations().createLocalUser(user, (passwordBased ? new PasswordToken(password) : null));
+ loginAs(testUser);
+ test_user_conn.securityOperations().changeUserAuthorizations(user, new Authorizations("A", "B"));
+ loginAs(rootUser);
+ if (root_conn.securityOperations().getUserAuthorizations(user).isEmpty())
+ throw new IllegalStateException("Should be able to alter a user");
+ break;
+ case SYSTEM:
+ // test for system permission would go here
+ break;
+ case CREATE_NAMESPACE:
+ namespace = "__CREATE_NAMESPACE_WITH_PERM_TEST__";
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().create(namespace);
+ loginAs(rootUser);
+ if (!root_conn.namespaceOperations().list().contains(namespace))
+ throw new IllegalStateException("Should be able to create a namespace");
+ break;
+ case DROP_NAMESPACE:
+ namespace = "__DROP_NAMESPACE_WITH_PERM_TEST__";
+ loginAs(rootUser);
+ root_conn.namespaceOperations().create(namespace);
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().delete(namespace);
+ loginAs(rootUser);
+ if (root_conn.namespaceOperations().list().contains(namespace))
+ throw new IllegalStateException("Should be able to delete a namespace");
+ break;
+ case ALTER_NAMESPACE:
+ namespace = "__ALTER_NAMESPACE_WITH_PERM_TEST__";
+ String namespace2 = namespace + "2";
+ loginAs(rootUser);
+ root_conn.namespaceOperations().create(namespace);
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().setProperty(namespace, Property.TABLE_BLOOM_ERRORRATE.getKey(), "003.14159%");
+ loginAs(rootUser);
+ Map<String,String> propies = map(root_conn.namespaceOperations().getProperties(namespace));
+ if (!propies.get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw new IllegalStateException("Should be able to set a table property");
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().removeProperty(namespace, Property.TABLE_BLOOM_ERRORRATE.getKey());
+ loginAs(rootUser);
+ propies = map(root_conn.namespaceOperations().getProperties(namespace));
+ if (propies.get(Property.TABLE_BLOOM_ERRORRATE.getKey()).equals("003.14159%"))
+ throw new IllegalStateException("Should be able to remove a table property");
+ loginAs(testUser);
+ test_user_conn.namespaceOperations().rename(namespace, namespace2);
+ loginAs(rootUser);
+ if (root_conn.namespaceOperations().list().contains(namespace) || !root_conn.namespaceOperations().list().contains(namespace2))
+ throw new IllegalStateException("Should be able to rename a table");
+ break;
+ case OBTAIN_DELEGATION_TOKEN:
+ ClientConfiguration clientConf = cluster.getClientConfig();
+ if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ // TODO Try to obtain a delegation token with the permission
+ }
+ break;
+ case GRANT:
+ loginAs(rootUser);
+ root_conn.securityOperations().grantSystemPermission(testUser.getPrincipal(), SystemPermission.GRANT);
+ loginAs(testUser);
+ test_user_conn.securityOperations().grantSystemPermission(testUser.getPrincipal(), SystemPermission.CREATE_TABLE);
+ loginAs(rootUser);
+ assertTrue("Test user should have CREATE_TABLE",
+ root_conn.securityOperations().hasSystemPermission(testUser.getPrincipal(), SystemPermission.CREATE_TABLE));
+ assertTrue("Test user should have GRANT", root_conn.securityOperations().hasSystemPermission(testUser.getPrincipal(), SystemPermission.GRANT));
+ root_conn.securityOperations().revokeSystemPermission(testUser.getPrincipal(), SystemPermission.CREATE_TABLE);
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized System Permission: " + perm);
+ }
+ }
+
+ private void verifyHasOnlyTheseSystemPermissions(Connector root_conn, String user, SystemPermission... perms) throws AccumuloException,
+ AccumuloSecurityException {
+ List<SystemPermission> permList = Arrays.asList(perms);
+ for (SystemPermission p : SystemPermission.values()) {
+ if (permList.contains(p)) {
+ // should have these
+ if (!root_conn.securityOperations().hasSystemPermission(user, p))
+ throw new IllegalStateException(user + " SHOULD have system permission " + p);
+ } else {
+ // should not have these
+ if (root_conn.securityOperations().hasSystemPermission(user, p))
+ throw new IllegalStateException(user + " SHOULD NOT have system permission " + p);
+ }
+ }
+ }
+
+ private void verifyHasNoSystemPermissions(Connector root_conn, String user, SystemPermission... perms) throws AccumuloException, AccumuloSecurityException {
+ for (SystemPermission p : perms)
+ if (root_conn.securityOperations().hasSystemPermission(user, p))
+ throw new IllegalStateException(user + " SHOULD NOT have system permission " + p);
+ }
+
+ @Test
+ public void tablePermissionTest() throws Exception {
+ // create the test user
+ ClusterUser testUser = getUser(0), rootUser = getAdminUser();
+
+ String principal = testUser.getPrincipal();
+ AuthenticationToken token = testUser.getToken();
+ PasswordToken passwordToken = null;
+ if (token instanceof PasswordToken) {
+ passwordToken = (PasswordToken) token;
+ }
+ loginAs(rootUser);
+ Connector c = getConnector();
+ c.securityOperations().createLocalUser(principal, passwordToken);
+ loginAs(testUser);
+ Connector test_user_conn = c.getInstance().getConnector(principal, token);
+
+ // check for read-only access to metadata table
+ loginAs(rootUser);
+ verifyHasOnlyTheseTablePermissions(c, c.whoami(), MetadataTable.NAME, TablePermission.READ, TablePermission.ALTER_TABLE);
+ verifyHasOnlyTheseTablePermissions(c, principal, MetadataTable.NAME, TablePermission.READ);
+ String tableName = getUniqueNames(1)[0] + "__TABLE_PERMISSION_TEST__";
+
+ // test each permission
+ for (TablePermission perm : TablePermission.values()) {
+ log.debug("Verifying the " + perm + " permission");
+
+ // test permission before and after granting it
+ createTestTable(c, principal, tableName);
+ loginAs(testUser);
+ testMissingTablePermission(test_user_conn, testUser, perm, tableName);
+ loginAs(rootUser);
+ c.securityOperations().grantTablePermission(principal, tableName, perm);
+ verifyHasOnlyTheseTablePermissions(c, principal, tableName, perm);
+ loginAs(testUser);
+ testGrantedTablePermission(test_user_conn, testUser, perm, tableName);
+
+ loginAs(rootUser);
+ createTestTable(c, principal, tableName);
+ c.securityOperations().revokeTablePermission(principal, tableName, perm);
+ verifyHasNoTablePermissions(c, principal, tableName, perm);
+ }
+ }
+
+ private void createTestTable(Connector c, String testUser, String tableName) throws Exception, MutationsRejectedException {
+ if (!c.tableOperations().exists(tableName)) {
+ // create the test table
+ c.tableOperations().create(tableName);
+ // put in some initial data
+ BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig());
+ Mutation m = new Mutation(new Text("row"));
+ m.put(new Text("cf"), new Text("cq"), new Value("val".getBytes()));
+ writer.addMutation(m);
+ writer.close();
+
+ // verify proper permissions for creator and test user
+ verifyHasOnlyTheseTablePermissions(c, c.whoami(), tableName, TablePermission.values());
+ verifyHasNoTablePermissions(c, testUser, tableName, TablePermission.values());
+
+ }
+ }
+
+ private void testMissingTablePermission(Connector test_user_conn, ClusterUser testUser, TablePermission perm, String tableName) throws Exception {
+ Scanner scanner;
+ BatchWriter writer;
+ Mutation m;
+ log.debug("Confirming that the lack of the " + perm + " permission properly restricts the user");
+
+ // test permission prior to granting it
+ switch (perm) {
+ case READ:
+ try {
+ scanner = test_user_conn.createScanner(tableName, Authorizations.EMPTY);
+ int i = 0;
+ for (Entry<Key,Value> entry : scanner)
+ i += 1 + entry.getKey().getRowData().length();
+ if (i != 0)
+ throw new IllegalStateException("Should NOT be able to read from the table");
+ } catch (RuntimeException e) {
+ AccumuloSecurityException se = (AccumuloSecurityException) e.getCause();
+ if (se.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED)
+ throw se;
+ }
+ break;
+ case WRITE:
+ try {
+ writer = test_user_conn.createBatchWriter(tableName, new BatchWriterConfig());
+ m = new Mutation(new Text("row"));
+ m.put(new Text("a"), new Text("b"), new Value("c".getBytes()));
+ writer.addMutation(m);
+ try {
+ writer.close();
+ } catch (MutationsRejectedException e1) {
+ if (e1.getSecurityErrorCodes().size() > 0)
+ throw new AccumuloSecurityException(test_user_conn.whoami(), org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode.PERMISSION_DENIED, e1);
+ }
+ throw new IllegalStateException("Should NOT be able to write to a table");
+ } catch (AccumuloSecurityException e) {
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED)
+ throw e;
+ }
+ break;
+ case BULK_IMPORT:
+ // test for bulk import permission would go here
+ break;
+ case ALTER_TABLE:
+ Map<String,Set<Text>> groups = new HashMap<>();
+ groups.put("tgroup", new HashSet<>(Arrays.asList(new Text("t1"), new Text("t2"))));
+ try {
+ test_user_conn.tableOperations().setLocalityGroups(tableName, groups);
+ throw new IllegalStateException("User should not be able to set locality groups");
+ } catch (AccumuloSecurityException e) {
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED)
+ throw e;
+ }
+ break;
+ case DROP_TABLE:
+ try {
+ test_user_conn.tableOperations().delete(tableName);
+ throw new IllegalStateException("User should not be able delete the table");
+ } catch (AccumuloSecurityException e) {
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED)
+ throw e;
+ }
+ break;
+ case GRANT:
+ try {
+ test_user_conn.securityOperations().grantTablePermission(getAdminPrincipal(), tableName, TablePermission.GRANT);
+ throw new IllegalStateException("User should not be able grant permissions");
+ } catch (AccumuloSecurityException e) {
+ if (e.getSecurityErrorCode() != SecurityErrorCode.PERMISSION_DENIED)
+ throw e;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized table Permission: " + perm);
+ }
+ }
+
+ private void testGrantedTablePermission(Connector test_user_conn, ClusterUser normalUser, TablePermission perm, String tableName) throws AccumuloException,
+ TableExistsException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException {
+ Scanner scanner;
+ BatchWriter writer;
+ Mutation m;
+ log.debug("Confirming that the presence of the " + perm + " permission properly permits the user");
+
+ // test permission after granting it
+ switch (perm) {
+ case READ:
+ scanner = test_user_conn.createScanner(tableName, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext())
+ iter.next();
+ break;
+ case WRITE:
+ writer = test_user_conn.createBatchWriter(tableName, new BatchWriterConfig());
+ m = new Mutation(new Text("row"));
+ m.put(new Text("a"), new Text("b"), new Value("c".getBytes()));
+ writer.addMutation(m);
+ writer.close();
+ break;
+ case BULK_IMPORT:
+ // test for bulk import permission would go here
+ break;
+ case ALTER_TABLE:
+ Map<String,Set<Text>> groups = new HashMap<>();
+ groups.put("tgroup", new HashSet<>(Arrays.asList(new Text("t1"), new Text("t2"))));
+ break;
+ case DROP_TABLE:
+ test_user_conn.tableOperations().delete(tableName);
+ break;
+ case GRANT:
+ test_user_conn.securityOperations().grantTablePermission(getAdminPrincipal(), tableName, TablePermission.GRANT);
+ break;
+ default:
+ throw new IllegalArgumentException("Unrecognized table Permission: " + perm);
+ }
+ }
+
+ private void verifyHasOnlyTheseTablePermissions(Connector root_conn, String user, String table, TablePermission... perms) throws AccumuloException,
+ AccumuloSecurityException {
+ List<TablePermission> permList = Arrays.asList(perms);
+ for (TablePermission p : TablePermission.values()) {
+ if (permList.contains(p)) {
+ // should have these
+ if (!root_conn.securityOperations().hasTablePermission(user, table, p))
+ throw new IllegalStateException(user + " SHOULD have table permission " + p + " for table " + table);
+ } else {
+ // should not have these
+ if (root_conn.securityOperations().hasTablePermission(user, table, p))
+ throw new IllegalStateException(user + " SHOULD NOT have table permission " + p + " for table " + table);
+ }
+ }
+ }
+
+ private void verifyHasNoTablePermissions(Connector root_conn, String user, String table, TablePermission... perms) throws AccumuloException,
+ AccumuloSecurityException {
+ for (TablePermission p : perms)
+ if (root_conn.securityOperations().hasTablePermission(user, table, p))
+ throw new IllegalStateException(user + " SHOULD NOT have table permission " + p + " for table " + table);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d28a3ee3/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
index 504a5d9,0000000..22fbf18
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/TableIT.java
@@@ -1,107 -1,0 +1,110 @@@
+package org.apache.accumulo.test.functional;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+
+import org.apache.accumulo.cluster.AccumuloCluster;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.test.VerifyIngest;
++import org.apache.accumulo.test.categories.MiniClusterOnlyTest;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.hamcrest.CoreMatchers;
+import org.junit.Assume;
+import org.junit.Test;
++import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Iterators;
+
++@Category(MiniClusterOnlyTest.class)
+public class TableIT extends AccumuloClusterHarness {
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 2 * 60;
+ }
+
+ @Test
+ public void test() throws Exception {
+ Assume.assumeThat(getClusterType(), CoreMatchers.is(ClusterType.MINI));
+
+ AccumuloCluster cluster = getCluster();
+ MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
+ String rootPath = mac.getConfig().getDir().getAbsolutePath();
+
+ Connector c = getConnector();
+ TableOperations to = c.tableOperations();
+ String tableName = getUniqueNames(1)[0];
+ to.create(tableName);
+
+ TestIngest.Opts opts = new TestIngest.Opts();
+ VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+ ClientConfiguration clientConfig = getCluster().getClientConfig();
+ if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) {
+ opts.updateKerberosCredentials(clientConfig);
+ vopts.updateKerberosCredentials(clientConfig);
+ } else {
+ opts.setPrincipal(getAdminPrincipal());
+ vopts.setPrincipal(getAdminPrincipal());
+ }
+
+ opts.setTableName(tableName);
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ to.flush(tableName, null, null, true);
+ vopts.setTableName(tableName);
+ VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+ String id = to.tableIdMap().get(tableName);
+ Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(new KeyExtent(id, null, null).toMetadataRange());
+ s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ assertTrue(Iterators.size(s.iterator()) > 0);
+
+ FileSystem fs = getCluster().getFileSystem();
+ assertTrue(fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length > 0);
+ to.delete(tableName);
+ assertEquals(0, Iterators.size(s.iterator()));
+ try {
+ assertEquals(0, fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id)).length);
+ } catch (FileNotFoundException ex) {
+ // that's fine, too
+ }
+ assertNull(to.tableIdMap().get(tableName));
+ to.create(tableName);
+ TestIngest.ingest(c, opts, new BatchWriterOpts());
+ VerifyIngest.verifyIngest(c, vopts, new ScannerOpts());
+ to.delete(tableName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d28a3ee3/test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
index 4559195,0000000..32df894
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/KerberosReplicationIT.java
@@@ -1,243 -1,0 +1,246 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.cluster.ClusterUser;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.harness.AccumuloITBase;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.MiniClusterHarness;
+import org.apache.accumulo.harness.TestingKdc;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
+import org.apache.accumulo.server.replication.ReplicaSystemFactory;
++import org.apache.accumulo.test.categories.MiniClusterOnlyTest;
+import org.apache.accumulo.test.functional.KerberosIT;
+import org.apache.accumulo.tserver.TabletServer;
+import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+/**
+ * Ensure that replication occurs using keytabs instead of password (not to mention SASL)
+ */
++@Category(MiniClusterOnlyTest.class)
+public class KerberosReplicationIT extends AccumuloITBase {
+ private static final Logger log = LoggerFactory.getLogger(KerberosIT.class);
+
+ private static TestingKdc kdc;
+ private static String krbEnabledForITs = null;
+ private static ClusterUser rootUser;
+
+ @BeforeClass
+ public static void startKdc() throws Exception {
+ kdc = new TestingKdc();
+ kdc.start();
+ krbEnabledForITs = System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION);
+ if (null == krbEnabledForITs || !Boolean.parseBoolean(krbEnabledForITs)) {
+ System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, "true");
+ }
+ rootUser = kdc.getRootUser();
+ }
+
+ @AfterClass
+ public static void stopKdc() throws Exception {
+ if (null != kdc) {
+ kdc.stop();
+ }
+ if (null != krbEnabledForITs) {
+ System.setProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION, krbEnabledForITs);
+ }
+ }
+
+ private MiniAccumuloClusterImpl primary, peer;
+ private String PRIMARY_NAME = "primary", PEER_NAME = "peer";
+
+ @Override
+ protected int defaultTimeoutSeconds() {
+ return 60 * 3;
+ }
+
+ private MiniClusterConfigurationCallback getConfigCallback(final String name) {
+ return new MiniClusterConfigurationCallback() {
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
+ cfg.setNumTservers(1);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+ cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+ cfg.setProperty(Property.REPLICATION_NAME, name);
+ cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+ cfg.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
+ coreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ coreSite.set("fs.defaultFS", "file:///");
+ }
+ };
+ }
+
+ @Before
+ public void setup() throws Exception {
+ MiniClusterHarness harness = new MiniClusterHarness();
+
+ // Create a primary and a peer instance, both with the same "root" user
+ primary = harness.create(getClass().getName(), testName.getMethodName(), new PasswordToken("unused"), getConfigCallback(PRIMARY_NAME), kdc);
+ primary.start();
+
+ peer = harness.create(getClass().getName(), testName.getMethodName() + "_peer", new PasswordToken("unused"), getConfigCallback(PEER_NAME), kdc);
+ peer.start();
+
+ // Enable kerberos auth
+ Configuration conf = new Configuration(false);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (null != peer) {
+ peer.stop();
+ }
+ if (null != primary) {
+ primary.stop();
+ }
+ UserGroupInformation.setConfiguration(new Configuration(false));
+ }
+
+ @Test
+ public void dataReplicatedToCorrectTable() throws Exception {
+ // Login as the root user
+ final UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(rootUser.getPrincipal(), rootUser.getKeytab().toURI().toString());
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ log.info("testing {}", ugi);
+ final KerberosToken token = new KerberosToken();
+ final Connector primaryConn = primary.getConnector(rootUser.getPrincipal(), token);
+ final Connector peerConn = peer.getConnector(rootUser.getPrincipal(), token);
+
+ ClusterUser replicationUser = kdc.getClientPrincipal(0);
+
+ // Create user for replication to the peer
+ peerConn.securityOperations().createLocalUser(replicationUser.getPrincipal(), null);
+
+ primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + PEER_NAME, replicationUser.getPrincipal());
+ primaryConn.instanceOperations().setProperty(Property.REPLICATION_PEER_KEYTAB.getKey() + PEER_NAME, replicationUser.getKeytab().getAbsolutePath());
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ primaryConn.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + PEER_NAME,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peerConn.getInstance().getInstanceName(), peerConn.getInstance().getZooKeepers())));
+
+ String primaryTable1 = "primary", peerTable1 = "peer";
+
+ // Create tables
+ primaryConn.tableOperations().create(primaryTable1);
+ String masterTableId1 = primaryConn.tableOperations().tableIdMap().get(primaryTable1);
+ Assert.assertNotNull(masterTableId1);
+
+ peerConn.tableOperations().create(peerTable1);
+ String peerTableId1 = peerConn.tableOperations().tableIdMap().get(peerTable1);
+ Assert.assertNotNull(peerTableId1);
+
+ // Grant write permission
+ peerConn.securityOperations().grantTablePermission(replicationUser.getPrincipal(), peerTable1, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION.getKey(), "true");
+ primaryConn.tableOperations().setProperty(primaryTable1, Property.TABLE_REPLICATION_TARGET.getKey() + PEER_NAME, peerTableId1);
+
+ // Write some data to table1
+ BatchWriter bw = primaryConn.createBatchWriter(primaryTable1, new BatchWriterConfig());
+ long masterTable1Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(primaryTable1 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable1Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to primary cluster");
+
+ Set<String> filesFor1 = primaryConn.replicationOperations().referencedFiles(primaryTable1);
+
+ // Restart the tserver to force a close on the WAL
+ for (ProcessReference proc : primary.getProcesses().get(ServerType.TABLET_SERVER)) {
+ primary.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ primary.exec(TabletServer.class);
+
+ log.info("Restarted the tserver");
+
+ // Read the data -- the tserver is back up and running and tablets are assigned
+ Iterators.size(primaryConn.createScanner(primaryTable1, Authorizations.EMPTY).iterator());
+
+ // Wait for both tables to be replicated
+ log.info("Waiting for {} for {}", filesFor1, primaryTable1);
+ primaryConn.replicationOperations().drain(primaryTable1, filesFor1);
+
+ long countTable = 0l;
+ for (Entry<Key,Value> entry : peerConn.createScanner(peerTable1, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(primaryTable1));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable1);
+ Assert.assertEquals(masterTable1Records, countTable);
+
+ return null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d28a3ee3/trace/pom.xml
----------------------------------------------------------------------