You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sentry.apache.org by an...@apache.org on 2016/01/19 22:34:50 UTC

incubator-sentry git commit: SENTRY-906: Add concurrency sentry client tests. (Anne Yu, reviewed by Hao Hao) (to run it: -Dsentry.scaletest.oncluster=true, -Dsentry.host=${SENTRY_HOST}, -Dhive.server2.thrift.bind.host=${HS2_HOST})

Repository: incubator-sentry
Updated Branches:
  refs/heads/master 67031139f -> 5a827f6db


SENTRY-906: Add concurrency sentry client tests. (Anne Yu, reviewed by Hao Hao)
 (to run it: -Dsentry.scaletest.oncluster=true, -Dsentry.host=${SENTRY_HOST}, -Dhive.server2.thrift.bind.host=${HS2_HOST})


Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/5a827f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/5a827f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/5a827f6d

Branch: refs/heads/master
Commit: 5a827f6db54b1c0d3e310e98c1a35298c23ec114
Parents: 6703113
Author: Anne Yu <an...@cloudera.com>
Authored: Fri Jan 15 14:49:12 2016 -0800
Committer: Anne Yu <an...@cloudera.com>
Committed: Fri Jan 15 14:53:30 2016 -0800

----------------------------------------------------------------------
 sentry-tests/sentry-tests-hive/pom.xml          |   1 +
 .../e2e/dbprovider/TestConcurrentClients.java   | 344 +++++++++++++++++++
 .../AbstractTestWithStaticConfiguration.java    |  86 ++++-
 3 files changed, 430 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/pom.xml
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/pom.xml b/sentry-tests/sentry-tests-hive/pom.xml
index 98e4752..472cce7 100644
--- a/sentry-tests/sentry-tests-hive/pom.xml
+++ b/sentry-tests/sentry-tests-hive/pom.xml
@@ -462,6 +462,7 @@ limitations under the License.
           <include>**/TestDbPrivilegesAtColumnScope.java</include>
           <include>**/TestColumnEndToEnd.java</include>
           <include>**/TestDbComplexView.java</include>
+          <include>**/TestConcurrentClients</include>
         </includes>
         <argLine>-Dsentry.e2etest.hiveServer2Type=UnmanagedHiveServer2 -Dsentry.e2etest.DFSType=ClusterDFS -Dsentry.e2etest.external.sentry=true</argLine>
        </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java
new file mode 100644
index 0000000..d926797
--- /dev/null
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestConcurrentClients.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sentry.tests.e2e.dbprovider;
+
+import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
+import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.tests.e2e.hive.AbstractTestWithStaticConfiguration;
+
+import org.apache.sentry.tests.e2e.hive.StaticUserGroup;
+import static org.junit.Assume.assumeTrue;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.RandomStringUtils;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * The test class implements concurrency tests to test:
+ * Sentry client, HS2 jdbc client etc.
+ */
+public class TestConcurrentClients extends AbstractTestWithStaticConfiguration {
+  private static final Logger LOGGER = LoggerFactory
+          .getLogger(TestConcurrentClients.class);
+
+  private PolicyFile policyFile;
+
+  // define scale for tests
+  private final int NUM_OF_TABLES = Integer.parseInt(System.getProperty(
+          "sentry.e2e.concurrency.test.tables-per-db", "1"));
+  private final int NUM_OF_PAR = Integer.parseInt(System.getProperty(
+          "sentry.e2e.concurrency.test.partitions-per-tb", "3"));
+  private final int NUM_OF_THREADS = Integer.parseInt(System.getProperty(
+          "sentry.e2e.concurrency.test.threads", "30"));
+  private final int NUM_OF_TASKS = Integer.parseInt(System.getProperty(
+          "sentry.e2e.concurrency.test.tasks", "100"));
+  private final Long HS2_CLIENT_TEST_DURATION_MS = Long.parseLong(System.getProperty(
+          "sentry.e2e.concurrency.test.hs2client.test.time.ms", "10000")); //millis
+  private final Long SENTRY_CLIENT_TEST_DURATION_MS = Long.parseLong(System.getProperty(
+          "sentry.e2e.concurrency.test.sentryclient.test.time.ms", "10000")); //millis
+
+  private static Map<String, String> privileges = new HashMap<String, String>();
+  static {
+    privileges.put("all_db1", "server=server1->db=" + DB1 + "->action=all");
+  }
+
+  @Override
+  @Before
+  public void setup() throws Exception {
+    super.setupAdmin();
+    policyFile = PolicyFile.setAdminOnServer1(ADMINGROUP)
+            .setUserGroupMapping(StaticUserGroup.getStaticMapping());
+    writePolicyFile(policyFile);
+  }
+
+  @BeforeClass
+  public static void setupTestStaticConfiguration() throws Exception {
+    assumeTrue(Boolean.parseBoolean(System.getProperty("sentry.scaletest.oncluster", "false")));
+    useSentryService = true;    // configure sentry client
+    clientKerberos = true; // need to get client configuration from testing environments
+    AbstractTestWithStaticConfiguration.setupTestStaticConfiguration();
+ }
+
+  static String randomString( int len ){
+    return RandomStringUtils.random(len, true, false);
+  }
+
+  private void execStmt(Statement stmt, String sql) throws Exception {
+    LOGGER.info("Running [" + sql + "]");
+    stmt.execute(sql);
+  }
+
+  private void createDbTb(String user, String db, String tb) throws Exception{
+    Connection connection = context.createConnection(user);
+    Statement statement = context.createStatement(connection);
+    try {
+      execStmt(statement, "DROP DATABASE IF EXISTS " + db + " CASCADE");
+      execStmt(statement, "CREATE DATABASE " + db);
+      execStmt(statement, "USE " + db);
+      for (int i = 0; i < NUM_OF_TABLES; i++) {
+        String tbName = tb + "_" + Integer.toString(i);
+        execStmt(statement, "CREATE TABLE  " + tbName + " (a string) PARTITIONED BY (b string)");
+      }
+    } catch (Exception ex) {
+      LOGGER.error("caught exception: " + ex);
+    } finally {
+      statement.close();
+      connection.close();
+    }
+  }
+
+  private void createPartition(String user, String db, String tb) throws Exception{
+    Connection connection = context.createConnection(user);
+    Statement statement = context.createStatement(connection);
+    try {
+      execStmt(statement, "USE " + db);
+      for (int j = 0; j < NUM_OF_TABLES; j++) {
+        String tbName = tb + "_" + Integer.toString(j);
+        for (int i = 0; i < NUM_OF_PAR; i++) {
+          String randStr = randomString(4);
+          String sql = "ALTER TABLE " + tbName + " ADD IF NOT EXISTS PARTITION (b = '" + randStr + "') ";
+          LOGGER.info("[" + i + "] " + sql);
+          execStmt(statement, sql);
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("caught exception: " + ex);
+    } finally {
+      statement.close();
+      connection.close();
+    }
+  }
+
+  private void adminCreateRole(String roleName) throws Exception {
+    Connection connection = context.createConnection(ADMIN1);
+    Statement stmt = context.createStatement(connection);
+    try {
+      execStmt(stmt, "DROP ROLE " + roleName);
+    } catch (Exception ex) {
+      LOGGER.warn("Role does not exist " + roleName);
+    } finally {
+      try {
+        execStmt(stmt, "CREATE ROLE " + roleName);
+      } catch (Exception ex) {
+        LOGGER.error("caught exception when create new role: " + ex);
+      } finally {
+        stmt.close();
+        connection.close();
+      }
+    }
+  }
+
+  private void adminCleanUp(String db, String roleName) throws Exception {
+    Connection connection = context.createConnection(ADMIN1);
+    Statement stmt = context.createStatement(connection);
+    try {
+      execStmt(stmt, "DROP DATABASE IF EXISTS " + db + " CASCADE");
+      execStmt(stmt, "DROP ROLE " + roleName);
+    } catch (Exception ex) {
+      LOGGER.warn("Failed to clean up ", ex);
+    } finally {
+      stmt.close();
+      connection.close();
+    }
+  }
+
+  private void adminShowRole(String roleName) throws Exception {
+    Connection connection = context.createConnection(ADMIN1);
+    Statement stmt = context.createStatement(connection);
+    boolean found = false;
+    try {
+      ResultSet rs = stmt.executeQuery("SHOW ROLES ");
+      while (rs.next()) {
+        if (rs.getString("role").equalsIgnoreCase(roleName)) {
+          LOGGER.info("Found role " + roleName);
+          found = true;
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.error("caught exception when show roles: " + ex);
+    } finally {
+      stmt.close();
+      connection.close();
+    }
+    assertTrue("failed to detect " + roleName, found);
+  }
+
+  private void adminGrant(String test_db, String test_tb,
+                          String roleName, String group) throws Exception {
+    Connection connection = context.createConnection(ADMIN1);
+    Statement stmt = context.createStatement(connection);
+    try {
+      execStmt(stmt, "USE " + test_db);
+      for (int i = 0; i < NUM_OF_TABLES; i++) {
+        String tbName = test_tb + "_" + Integer.toString(i);
+        execStmt(stmt, "GRANT ALL ON TABLE " + tbName + " TO ROLE " + roleName);
+      }
+      execStmt(stmt, "GRANT ROLE " + roleName + " TO GROUP " + group);
+    } catch (Exception ex) {
+      LOGGER.error("caught exception when grant permission and role: " + ex);
+    } finally {
+      stmt.close();
+      connection.close();
+    }
+  }
+
+  /**
+   * A synchronized state class to track concurrency test status from each thread
+   */
+  private final static class TestRuntimeState {
+    private int numSuccess = 0;
+    private boolean failed = false;
+    private Throwable firstException = null;
+
+    public synchronized void setFirstException(Throwable e) {
+      failed = true;
+      if (firstException == null) {
+        firstException = e;
+      }
+    }
+    public synchronized void setNumSuccess() {
+      numSuccess += 1;
+    }
+    public synchronized int getNumSuccess() {
+      return numSuccess;
+    }
+    public synchronized Throwable getFirstException() {
+      return firstException;
+    }
+    public synchronized boolean isFailed() {
+      return failed;
+    }
+   }
+
+  /**
+   * Test when concurrent HS2 clients talking to server,
+   * Privileges are correctly created and updated.
+   * @throws Exception
+   */
+  @Test
+  public void testConccurentHS2Client() throws Exception {
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
+    final TestRuntimeState state = new TestRuntimeState();
+
+    for (int i = 0; i < NUM_OF_TASKS; i ++) {
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          LOGGER.info("Starting tests: create role, show role, create db and tbl, and create partitions");
+          if (state.failed) return;
+          try {
+            Long startTime = System.currentTimeMillis();
+            Long elapsedTime = 0L;
+            while (Long.compare(elapsedTime, HS2_CLIENT_TEST_DURATION_MS) <= 0) {
+              String randStr = randomString(5);
+              String test_role = "test_role_" + randStr;
+              String test_db = "test_db_" + randStr;
+              String test_tb = "test_tb_" + randStr;
+              LOGGER.info("Start to test sentry with hs2 client with role " + test_role);
+              adminCreateRole(test_role);
+              adminShowRole(test_role);
+              createDbTb(ADMIN1, test_db, test_tb);
+              adminGrant(test_db, test_tb, test_role, USERGROUP1);
+              createPartition(USER1_1, test_db, test_tb);
+              adminCleanUp(test_db, test_role);
+              elapsedTime = System.currentTimeMillis() - startTime;
+              LOGGER.info("elapsedTime = " + elapsedTime);
+            }
+            state.setNumSuccess();
+          } catch (Exception e) {
+            LOGGER.error("Exception: " + e);
+            state.setFirstException(e);
+          }
+        }
+      });
+    }
+    executor.shutdown();
+    while (!executor.isTerminated()) {
+      Thread.sleep(1000); //millisecond
+    }
+    Throwable ex = state.getFirstException();
+    assertFalse( ex == null ? "Test failed" : ex.toString(), state.failed);
+    assertEquals(NUM_OF_TASKS, state.getNumSuccess());
+  }
+
+  /**
+   * Test when concurrent sentry clients talking to sentry server, threads data are synchronized
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSentryClient() throws Exception {
+    final String HIVE_KEYTAB_PATH =
+            System.getProperty("sentry.e2etest.hive.policyOwnerKeytab");
+    final SentryPolicyServiceClient client = getSentryClient("hive", HIVE_KEYTAB_PATH);
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
+
+    final TestRuntimeState state = new TestRuntimeState();
+    for (int i = 0; i < NUM_OF_TASKS; i ++) {
+      LOGGER.info("Start to test sentry client with task id [" + i + "]");
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          if (state.failed) {
+            LOGGER.error("found one failed state, abort test from here.");
+            return;
+          }
+          try {
+            String randStr = randomString(5);
+            String test_role = "test_role_" + randStr;
+            LOGGER.info("Start to test role: " + test_role);
+            Long startTime = System.currentTimeMillis();
+            Long elapsedTime = 0L;
+            while (Long.compare(elapsedTime, SENTRY_CLIENT_TEST_DURATION_MS) <= 0) {
+              LOGGER.info("Test role " + test_role + " runs " + elapsedTime + " ms.");
+              client.createRole(ADMIN1, test_role);
+              client.listRoles(ADMIN1);
+              client.grantServerPrivilege(ADMIN1, test_role, "server1", false);
+              client.listAllPrivilegesByRoleName(ADMIN1, test_role);
+              client.dropRole(ADMIN1, test_role);
+              elapsedTime = System.currentTimeMillis() - startTime;
+            }
+            state.setNumSuccess();
+          } catch (Exception e) {
+            LOGGER.error("Sentry Client Testing Exception: ", e);
+            state.setFirstException(e);
+          }
+        }
+      });
+    }
+    executor.shutdown();
+    while (!executor.isTerminated()) {
+      Thread.sleep(1000); //millisecond
+    }
+    Throwable ex = state.getFirstException();
+    assertFalse( ex == null ? "Test failed" : ex.toString(), state.failed);
+    assertEquals(NUM_OF_TASKS, state.getNumSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/5a827f6d/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
----------------------------------------------------------------------
diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
index dc8c1eb..614856f 100644
--- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
+++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hive/AbstractTestWithStaticConfiguration.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -32,7 +33,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.HashSet;
 
+import com.google.common.collect.Sets;
 import junit.framework.Assert;
 
 import org.apache.commons.io.FileUtils;
@@ -51,6 +54,7 @@ import org.apache.sentry.policy.db.DBModelAuthorizables;
 import org.apache.sentry.provider.db.SimpleDBProviderBackend;
 import org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClient;
 import org.apache.sentry.provider.file.PolicyFile;
+import org.apache.sentry.service.thrift.KerberosConfiguration;
 import org.apache.sentry.service.thrift.SentryServiceClientFactory;
 import org.apache.sentry.service.thrift.ServiceConstants.ClientConfig;
 import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
@@ -72,6 +76,10 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.LoginContext;
+
 public abstract class AbstractTestWithStaticConfiguration {
   private static final Logger LOGGER = LoggerFactory
       .getLogger(AbstractTestWithStaticConfiguration.class);
@@ -137,6 +145,38 @@ public abstract class AbstractTestWithStaticConfiguration {
   protected static Context context;
   protected final String semanticException = "SemanticException No valid privileges";
 
+  protected static boolean clientKerberos = false;
+  protected static String REALM = System.getProperty("sentry.service.realm", "EXAMPLE.COM");
+  protected static final String SERVER_KERBEROS_NAME = "sentry/" + SERVER_HOST + "@" + REALM;
+  protected static final String SERVER_KEY_TAB = System.getProperty("sentry.service.server.keytab");
+
+  private static LoginContext clientLoginContext;
+  protected static SentryPolicyServiceClient client;
+
+  /**
+   * Get sentry client with authenticated Subject
+   * (its security-related attributes(for example, kerberos principal and key)
+   * @param clientShortName
+   * @param clientKeyTabDir
+   * @return client's Subject
+   */
+  public static Subject getClientSubject(String clientShortName, String clientKeyTabDir) {
+    String clientKerberosPrincipal = clientShortName + "@" + REALM;
+    File clientKeyTabFile = new File(clientKeyTabDir);
+    Subject clientSubject = new Subject(false, Sets.newHashSet(
+            new KerberosPrincipal(clientKerberosPrincipal)), new HashSet<Object>(),
+            new HashSet<Object>());
+    try {
+      clientLoginContext = new LoginContext("", clientSubject, null,
+              KerberosConfiguration.createClientConfig(clientKerberosPrincipal, clientKeyTabFile));
+      clientLoginContext.login();
+    } catch (Exception ex) {
+      LOGGER.error("Exception: " + ex);
+    }
+    clientSubject = clientLoginContext.getSubject();
+    return clientSubject;
+  }
+
   public static void createContext() throws Exception {
     context = new Context(hiveServer, fileSystem,
         baseDir, confDir, dataDir, policyFileLocation);
@@ -445,6 +485,51 @@ public abstract class AbstractTestWithStaticConfiguration {
     return SentryServiceClientFactory.create(sentryServer.get(0).getConf());
   }
 
+  /**
+   * Get Sentry authorized client to communicate with sentry server,
+   * the client can be for a minicluster, real distributed cluster,
+   * sentry server can use policy file or it's a service.
+   * @param clientShortName: principal prefix string
+   * @param clientKeyTabDir: authorization key path
+   * @return sentry client to talk to sentry server
+   * @throws Exception
+   */
+  public static SentryPolicyServiceClient getSentryClient(String clientShortName,
+                                                          String clientKeyTabDir) throws Exception {
+    if (!useSentryService) {
+      LOGGER.info("Running on a minicluser env.");
+      return getSentryClient();
+    }
+
+    if (clientKerberos) {
+      if (sentryConf == null ) {
+        sentryConf = new Configuration(false);
+      }
+      final String SENTRY_HOST = System.getProperty("sentry.host", SERVER_HOST);
+      final String SERVER_KERBEROS_PRINCIPAL = "sentry/" + SENTRY_HOST + "@" + REALM;
+      sentryConf.set(ServerConfig.PRINCIPAL, SERVER_KERBEROS_PRINCIPAL);
+      sentryConf.set(ServerConfig.KEY_TAB, SERVER_KEY_TAB);
+      sentryConf.set(ServerConfig.ALLOW_CONNECT, "hive");
+      sentryConf.set(ServerConfig.SECURITY_USE_UGI_TRANSPORT, "false");
+      sentryConf.set(ClientConfig.SERVER_RPC_ADDRESS,
+              System.getProperty("sentry.service.server.rpc.address"));
+      sentryConf.set(ClientConfig.SERVER_RPC_PORT,
+              System.getProperty("sentry.service.server.rpc.port", "8038"));
+      sentryConf.set(ClientConfig.SERVER_RPC_CONN_TIMEOUT, "720000"); //millis
+      Subject clientSubject = getClientSubject(clientShortName, clientKeyTabDir);
+      client = Subject.doAs(clientSubject,
+              new PrivilegedExceptionAction<SentryPolicyServiceClient>() {
+                @Override
+                public SentryPolicyServiceClient run() throws Exception {
+                  return SentryServiceClientFactory.create(sentryConf);
+                }
+              });
+    } else {
+      client = getSentryClient();
+    }
+    return client;
+  }
+
   @Before
   public void setup() throws Exception{
     LOGGER.info("AbstractTestStaticConfiguration setup");
@@ -627,5 +712,4 @@ public abstract class AbstractTestWithStaticConfiguration {
     LOGGER.info("Running [" + sql + "]");
     stmt.execute(sql);
   }
-
 }