You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by an...@apache.org on 2016/01/19 22:34:50 UTC
incubator-sentry git commit: SENTRY-906: Add concurrency sentry
client tests. (Anne Yu,
reviewed by Hao Hao) (to run it: -Dsentry.scaletest.oncluster=true,
-Dsentry.host=${SENTRY_HOST}, -Dhive.server2.thrift.bind.host=${HS2_HOST})
Repository: incubator-sentry
Updated Branches:
refs/heads/master 67031139f -> 5a827f6db
SENTRY-906: Add concurrency sentry client tests. (Anne Yu, reviewed by Hao Hao)
(to run it: -Dsentry.scaletest.oncluster=true, -Dsentry.host=${SENTRY_HOST}, -Dhive.server2.thrift.bind.host=${HS2_HOST})
Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/5a827f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/5a827f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/5a827f6d
Branch: refs/heads/master
Commit: 5a827f6db54b1c0d3e310e98c1a35298c23ec114
Parents: 6703113
Author: Anne Yu <an...@cloudera.com>
Authored: Fri Jan 15 14:49:12 2016 -0800
Committer: Anne Yu <an...@cloudera.com>
Committed: Fri Jan 15 14:53:30 2016 -0800
----------------------------------------------------------------------
sentry-tests/sentry-tests-hive/pom.xml | 1 +
.../e2e/dbprovider/TestConcurrentClients.java | 344 +++++++++++++++++++
.../AbstractTestWithStaticConfiguration.java | 86 ++++-
3 files changed, 430 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml
index 98e4752..472cce7 100644
--- a/sentry-tests/sentry-tests-hive/pom.xml
+++ b/sentry-tests/sentry-tests-hive/pom.xml
@@ -462,6 +462,7 @@ limitations under the License.
<include>**/TestDbPrivilegesAtColumnScope.java</include>
<include>**/TestColumnEndToEnd.java</include>
<include>**/TestDbComplexView.java</include>
+ <include>**/TestConcurrentClients</include>
</includes>
<argLine>-Dsentry.e2etest.hiveServer2Type=UnmanagedHiveServer2 -Dsentry.e2etest.DFSType=ClusterDFS -Dsentry.e2etest.external.sentry=true</argLine>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java
new file mode 100644
index 0000000..d926797
--- /dev/null
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.dbprovider;
+
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.tests.e2e.hive.AbstractTestWithStaticConfiguration;
+
+import org.apache.sentry.tests.e2e.hive.StaticUserGroup;
+import static org.junit.Assume.assumeTrue;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.RandomStringUtils;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * The test class implements concurrency tests to test:
+ * Sentry client, HS2 jdbc client etc.
+ */
+public class TestConcurrentClients extends AbstractTestWithStaticConfiguration {
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(TestConcurrentClients.class);
+
+ private PolicyFile policyFile;
+
+ // define scale for tests
+ private final int NUM_OF_TABLES = Integer.parseInt(System.getProperty(
+ "sentry.e2e.concurrency.test.tables-per-db", "1"));
+ private final int NUM_OF_PAR = Integer.parseInt(System.getProperty(
+ "sentry.e2e.concurrency.test.partitions-per-tb", "3"));
+ private final int NUM_OF_THREADS = Integer.parseInt(System.getProperty(
+ "sentry.e2e.concurrency.test.threads", "30"));
+ private final int NUM_OF_TASKS = Integer.parseInt(System.getProperty(
+ "sentry.e2e.concurrency.test.tasks", "100"));
+ private final Long HS2_CLIENT_TEST_DURATION_MS = Long.parseLong(System.getProperty(
+ "sentry.e2e.concurrency.test.hs2client.test.time.ms", "10000")); //millis
+ private final Long SENTRY_CLIENT_TEST_DURATION_MS = Long.parseLong(System.getProperty(
+ "sentry.e2e.concurrency.test.sentryclient.test.time.ms", "10000")); //millis
+
+ private static Map<String, String> privileges = new HashMap<String, String>();
+ static {
+ privileges.put("all_db1", "server=server1->db=" + DB1 + "->action=all");
+ }
+
+ @Override
+ @Before
+ public void setup() throws Exception {
+ super.setupAdmin();
+ policyFile = PolicyFile.setAdminOnServer1(ADMINGROUP)
+ .setUserGroupMapping(StaticUserGroup.getStaticMapping());
+ writePolicyFile(policyFile);
+ }
+
+ @BeforeClass
+ public static void setupTestStaticConfiguration() throws Exception {
+ assumeTrue(Boolean.parseBoolean(System.getProperty("sentry.scaletest.oncluster", "false")));
+ useSentryService = true; // configure sentry client
+ clientKerberos = true; // need to get client configuration from testing environments
+ AbstractTestWithStaticConfiguration.setupTestStaticConfiguration();
+ }
+
+ static String randomString( int len ){
+ return RandomStringUtils.random(len, true, false);
+ }
+
+ private void execStmt(Statement stmt, String sql) throws Exception {
+ LOGGER.info("Running [" + sql + "]");
+ stmt.execute(sql);
+ }
+
+ private void createDbTb(String user, String db, String tb) throws Exception{
+ Connection connection = context.createConnection(user);
+ Statement statement = context.createStatement(connection);
+ try {
+ execStmt(statement, "DROP DATABASE IF EXISTS " + db + " CASCADE");
+ execStmt(statement, "CREATE DATABASE " + db);
+ execStmt(statement, "USE " + db);
+ for (int i = 0; i < NUM_OF_TABLES; i++) {
+ String tbName = tb + "_" + Integer.toString(i);
+ execStmt(statement, "CREATE TABLE " + tbName + " (a string) PARTITIONED BY (b string)");
+ }
+ } catch (Exception ex) {
+ LOGGER.error("caught exception: " + ex);
+ } finally {
+ statement.close();
+ connection.close();
+ }
+ }
+
+ private void createPartition(String user, String db, String tb) throws Exception{
+ Connection connection = context.createConnection(user);
+ Statement statement = context.createStatement(connection);
+ try {
+ execStmt(statement, "USE " + db);
+ for (int j = 0; j < NUM_OF_TABLES; j++) {
+ String tbName = tb + "_" + Integer.toString(j);
+ for (int i = 0; i < NUM_OF_PAR; i++) {
+ String randStr = randomString(4);
+ String sql = "ALTER TABLE " + tbName + " ADD IF NOT EXISTS PARTITION (b = '" + randStr + "') ";
+ LOGGER.info("[" + i + "] " + sql);
+ execStmt(statement, sql);
+ }
+ }
+ } catch (Exception ex) {
+ LOGGER.error("caught exception: " + ex);
+ } finally {
+ statement.close();
+ connection.close();
+ }
+ }
+
+ private void adminCreateRole(String roleName) throws Exception {
+ Connection connection = context.createConnection(ADMIN1);
+ Statement stmt = context.createStatement(connection);
+ try {
+ execStmt(stmt, "DROP ROLE " + roleName);
+ } catch (Exception ex) {
+ LOGGER.warn("Role does not exist " + roleName);
+ } finally {
+ try {
+ execStmt(stmt, "CREATE ROLE " + roleName);
+ } catch (Exception ex) {
+ LOGGER.error("caught exception when create new role: " + ex);
+ } finally {
+ stmt.close();
+ connection.close();
+ }
+ }
+ }
+
+ private void adminCleanUp(String db, String roleName) throws Exception {
+ Connection connection = context.createConnection(ADMIN1);
+ Statement stmt = context.createStatement(connection);
+ try {
+ execStmt(stmt, "DROP DATABASE IF EXISTS " + db + " CASCADE");
+ execStmt(stmt, "DROP ROLE " + roleName);
+ } catch (Exception ex) {
+ LOGGER.warn("Failed to clean up ", ex);
+ } finally {
+ stmt.close();
+ connection.close();
+ }
+ }
+
+ private void adminShowRole(String roleName) throws Exception {
+ Connection connection = context.createConnection(ADMIN1);
+ Statement stmt = context.createStatement(connection);
+ boolean found = false;
+ try {
+ ResultSet rs = stmt.executeQuery("SHOW ROLES ");
+ while (rs.next()) {
+ if (rs.getString("role").equalsIgnoreCase(roleName)) {
+ LOGGER.info("Found role " + roleName);
+ found = true;
+ }
+ }
+ } catch (Exception ex) {
+ LOGGER.error("caught exception when show roles: " + ex);
+ } finally {
+ stmt.close();
+ connection.close();
+ }
+ assertTrue("failed to detect " + roleName, found);
+ }
+
+ private void adminGrant(String test_db, String test_tb,
+ String roleName, String group) throws Exception {
+ Connection connection = context.createConnection(ADMIN1);
+ Statement stmt = context.createStatement(connection);
+ try {
+ execStmt(stmt, "USE " + test_db);
+ for (int i = 0; i < NUM_OF_TABLES; i++) {
+ String tbName = test_tb + "_" + Integer.toString(i);
+ execStmt(stmt, "GRANT ALL ON TABLE " + tbName + " TO ROLE " + roleName);
+ }
+ execStmt(stmt, "GRANT ROLE " + roleName + " TO GROUP " + group);
+ } catch (Exception ex) {
+ LOGGER.error("caught exception when grant permission and role: " + ex);
+ } finally {
+ stmt.close();
+ connection.close();
+ }
+ }
+
+ /**
+ * A synchronized state class to track concurrency test status from each thread
+ */
+ private final static class TestRuntimeState {
+ private int numSuccess = 0;
+ private boolean failed = false;
+ private Throwable firstException = null;
+
+ public synchronized void setFirstException(Throwable e) {
+ failed = true;
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+ public synchronized void setNumSuccess() {
+ numSuccess += 1;
+ }
+ public synchronized int getNumSuccess() {
+ return numSuccess;
+ }
+ public synchronized Throwable getFirstException() {
+ return firstException;
+ }
+ public synchronized boolean isFailed() {
+ return failed;
+ }
+ }
+
+ /**
+ * Test when concurrent HS2 clients talking to server,
+ * Privileges are correctly created and updated.
+ * @throws Exception
+ */
+ @Test
+ public void testConccurentHS2Client() throws Exception {
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
+ final TestRuntimeState state = new TestRuntimeState();
+
+ for (int i = 0; i < NUM_OF_TASKS; i ++) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ LOGGER.info("Starting tests: create role, show role, create db and tbl, and create partitions");
+ if (state.failed) return;
+ try {
+ Long startTime = System.currentTimeMillis();
+ Long elapsedTime = 0L;
+ while (Long.compare(elapsedTime, HS2_CLIENT_TEST_DURATION_MS) <= 0) {
+ String randStr = randomString(5);
+ String test_role = "test_role_" + randStr;
+ String test_db = "test_db_" + randStr;
+ String test_tb = "test_tb_" + randStr;
+ LOGGER.info("Start to test sentry with hs2 client with role " + test_role);
+ adminCreateRole(test_role);
+ adminShowRole(test_role);
+ createDbTb(ADMIN1, test_db, test_tb);
+ adminGrant(test_db, test_tb, test_role, USERGROUP1);
+ createPartition(USER1_1, test_db, test_tb);
+ adminCleanUp(test_db, test_role);
+ elapsedTime = System.currentTimeMillis() - startTime;
+ LOGGER.info("elapsedTime = " + elapsedTime);
+ }
+ state.setNumSuccess();
+ } catch (Exception e) {
+ LOGGER.error("Exception: " + e);
+ state.setFirstException(e);
+ }
+ }
+ });
+ }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ Thread.sleep(1000); //millisecond
+ }
+ Throwable ex = state.getFirstException();
+ assertFalse( ex == null ? "Test failed" : ex.toString(), state.failed);
+ assertEquals(NUM_OF_TASKS, state.getNumSuccess());
+ }
+
+ /**
+ * Test when concurrent sentry clients talking to sentry server, threads data are synchronized
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentSentryClient() throws Exception {
+ final String HIVE_KEYTAB_PATH =
+ System.getProperty("sentry.e2etest.hive.policyOwnerKeytab");
+ final SentryPolicyServiceClient client = getSentryClient("hive", HIVE_KEYTAB_PATH);
+ ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
+
+ final TestRuntimeState state = new TestRuntimeState();
+ for (int i = 0; i < NUM_OF_TASKS; i ++) {
+ LOGGER.info("Start to test sentry client with task id [" + i + "]");
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (state.failed) {
+ LOGGER.error("found one failed state, abort test from here.");
+ return;
+ }
+ try {
+ String randStr = randomString(5);
+ String test_role = "test_role_" + randStr;
+ LOGGER.info("Start to test role: " + test_role);
+ Long startTime = System.currentTimeMillis();
+ Long elapsedTime = 0L;
+ while (Long.compare(elapsedTime, SENTRY_CLIENT_TEST_DURATION_MS) <= 0) {
+ LOGGER.info("Test role " + test_role + " runs " + elapsedTime + " ms.");
+ client.createRole(ADMIN1, test_role);
+ client.listRoles(ADMIN1);
+ client.grantServerPrivilege(ADMIN1, test_role, "server1", false);
+ client.listAllPrivilegesByRoleName(ADMIN1, test_role);
+ client.dropRole(ADMIN1, test_role);
+ elapsedTime = System.currentTimeMillis() - startTime;
+ }
+ state.setNumSuccess();
+ } catch (Exception e) {
+ LOGGER.error("Sentry Client Testing Exception: ", e);
+ state.setFirstException(e);
+ }
+ }
+ });
+ }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ Thread.sleep(1000); //millisecond
+ }
+ Throwable ex = state.getFirstException();
+ assertFalse( ex == null ? "Test failed" : ex.toString(), state.failed);
+ assertEquals(NUM_OF_TASKS, state.getNumSuccess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
index dc8c1eb..614856f 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -32,7 +33,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.HashSet;
+import com.google.common.collect.Sets;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
@@ -51,6 +54,7 @@ import org.apache.sentry.policy.db.DBModelAuthorizables;
import org.apache.sentry.provider.db.SimpleDBProviderBackend;
import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.KerberosConfiguration;
import org.apache.sentry.service.thrift.SentryServiceClientFactory;
import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
@@ -72,6 +76,10 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.LoginContext;
+
public abstract class AbstractTestWithStaticConfiguration {
private static final Logger LOGGER = LoggerFactory
.getLogger(AbstractTestWithStaticConfiguration.class);
@@ -137,6 +145,38 @@ public abstract class AbstractTestWithStaticConfiguration {
protected static Context context;
protected final String semanticException = "SemanticException No valid privileges";
+ protected static boolean clientKerberos = false;
+ protected static String REALM = System.getProperty("sentry.service.realm", "EXAMPLE.COM");
+ protected static final String SERVER_KERBEROS_NAME = "sentry/" + SERVER_HOST + "@" + REALM;
+ protected static final String SERVER_KEY_TAB = System.getProperty("sentry.service.server.keytab");
+
+ private static LoginContext clientLoginContext;
+ protected static SentryPolicyServiceClient client;
+
+ /**
+ * Get sentry client with authenticated Subject
+ * (its security-related attributes(for example, kerberos principal and key)
+ * @param clientShortName
+ * @param clientKeyTabDir
+ * @return client's Subject
+ */
+ public static Subject getClientSubject(String clientShortName, String clientKeyTabDir) {
+ String clientKerberosPrincipal = clientShortName + "@" + REALM;
+ File clientKeyTabFile = new File(clientKeyTabDir);
+ Subject clientSubject = new Subject(false, Sets.newHashSet(
+ new KerberosPrincipal(clientKerberosPrincipal)), new HashSet<Object>(),
+ new HashSet<Object>());
+ try {
+ clientLoginContext = new LoginContext("", clientSubject, null,
+ KerberosConfiguration.createClientConfig(clientKerberosPrincipal, clientKeyTabFile));
+ clientLoginContext.login();
+ } catch (Exception ex) {
+ LOGGER.error("Exception: " + ex);
+ }
+ clientSubject = clientLoginContext.getSubject();
+ return clientSubject;
+ }
+
public static void createContext() throws Exception {
context = new Context(hiveServer, fileSystem,
baseDir, confDir, dataDir, policyFileLocation);
@@ -445,6 +485,51 @@ public abstract class AbstractTestWithStaticConfiguration {
return SentryServiceClientFactory.create(sentryServer.get(0).getConf());
}
+ /**
+ * Get Sentry authorized client to communicate with sentry server,
+ * the client can be for a minicluster, real distributed cluster,
+ * sentry server can use policy file or it's a service.
+ * @param clientShortName: principal prefix string
+ * @param clientKeyTabDir: authorization key path
+ * @return sentry client to talk to sentry server
+ * @throws Exception
+ */
+ public static SentryPolicyServiceClient getSentryClient(String clientShortName,
+ String clientKeyTabDir) throws Exception {
+ if (!useSentryService) {
+ LOGGER.info("Running on a minicluser env.");
+ return getSentryClient();
+ }
+
+ if (clientKerberos) {
+ if (sentryConf == null ) {
+ sentryConf = new Configuration(false);
+ }
+ final String SENTRY_HOST = System.getProperty("sentry.host", SERVER_HOST);
+ final String SERVER_KERBEROS_PRINCIPAL = "sentry/" + SENTRY_HOST + "@" + REALM;
+ sentryConf.set(ServerConfig.PRINCIPAL, SERVER_KERBEROS_PRINCIPAL);
+ sentryConf.set(ServerConfig.KEY_TAB, SERVER_KEY_TAB);
+ sentryConf.set(ServerConfig.ALLOW_CONNECT, "hive");
+ sentryConf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "false");
+ sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS,
+ System.getProperty("sentry.service.server.rpc.address"));
+ sentryConf.set(ClientConfig.SERVER_RPC_PORT,
+ System.getProperty("sentry.service.server.rpc.port", "8038"));
+ sentryConf.set(ClientConfig.SERVER_RPC_CONN_TIMEOUT, "720000"); //millis
+ Subject clientSubject = getClientSubject(clientShortName, clientKeyTabDir);
+ client = Subject.doAs(clientSubject,
+ new PrivilegedExceptionAction<SentryPolicyServiceClient>() {
+ @Override
+ public SentryPolicyServiceClient run() throws Exception {
+ return SentryServiceClientFactory.create(sentryConf);
+ }
+ });
+ } else {
+ client = getSentryClient();
+ }
+ return client;
+ }
+
@Before
public void setup() throws Exception{
LOGGER.info("AbstractTestStaticConfiguration setup");
@@ -627,5 +712,4 @@ public abstract class AbstractTestWithStaticConfiguration {
LOGGER.info("Running [" + sql + "]");
stmt.execute(sql);
}
-
}