You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/12/28 02:17:38 UTC

[impala] branch master updated (320f058 -> fa7d91f)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 320f058  IMPALA-8974: Fixed a bug when create kudu managed table without HMS config
     new 05dfb20  IMPALA-9195: Using multithreaded execution to accelerate 'show tables/databases'
     new fa7d91f  IMPALA-9241: Remove pid files on successful shutdown of minicluster

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |   8 ++
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../java/org/apache/impala/service/Frontend.java   | 124 +++++++++++++++++----
 .../node_templates/common/etc/init.d/common.tmpl   |   2 +
 tests/authorization/test_authorization.py          |  24 ++++
 7 files changed, 146 insertions(+), 20 deletions(-)


[impala] 02/02: IMPALA-9241: Remove pid files on successful shutdown of minicluster

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fa7d91fd305bac2dc5c4a145528b6cc45f5972fe
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Dec 26 12:21:50 2019 -0800

    IMPALA-9241: Remove pid files on successful shutdown of minicluster
    
    The minicluster init scripts currently keep track of pids
    for HDFS, YARN, etc by writing the pid to files for each
    service. It uses the pid in the file to see what is running and
    needs to shutdown or start. Currently, it does not remove the pid
    file after the minicluster shuts down. This means that it would
    be reading a zombie pid from the pid file to see if the service
    is already running. If the pid is reused by something else, it
    can fail to start up a necessary service.
    
    This removes the pid files when the minicluster components shut down
    successfully.
    
    Change-Id: I5b14d74df8061b6595b9897df9c9667e3f569e34
    Reviewed-on: http://gerrit.cloudera.org:8080/14950
    Reviewed-by: Andrew Sherman <as...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 testdata/cluster/node_templates/common/etc/init.d/common.tmpl | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/testdata/cluster/node_templates/common/etc/init.d/common.tmpl b/testdata/cluster/node_templates/common/etc/init.d/common.tmpl
index 51197c5..525ffdb 100644
--- a/testdata/cluster/node_templates/common/etc/init.d/common.tmpl
+++ b/testdata/cluster/node_templates/common/etc/init.d/common.tmpl
@@ -80,6 +80,8 @@ function stop {
   local DEADLINE=$(($(date +%s) + $TIMEOUT_SECS))
   while [[ $(date +%s) -lt $DEADLINE ]]; do
     if ! pid_exists $PID; then
+      # Remove the pid file so that there is no confusion if the pids wrap
+      rm -f $PID_FILE
       return
     fi
     sleep 0.1


[impala] 01/02: IMPALA-9195: Using multithreaded execution to accelerate 'show tables/databases'

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 05dfb208ff32d434399cb18219425467b0f9b2a9
Author: xuzhou <pe...@gmail.com>
AuthorDate: Fri Dec 20 17:25:39 2019 +0800

    IMPALA-9195: Using multithreaded execution to accelerate 'show tables/databases'
    
    If Sentry authorization is enabled, users with multi group-policies
    will take time to get the result of 'show tables/databases'. It seems
    that ResourceAuthorizationProvider.hasAccess performs bad for users
    with complex group-policies, IMPALA-9242 will target to address this
    problem.
    
    This patch provides a config option 'num_check_authorization_threads' to
    accelerate 'show tables/databases' by using multithreading. This configuration
    is applicable only when authorization is enabled. A value of 1 disables
    multi-threaded execution for checking access. However, a small value of larger
    than 1 may limit the parallism of FE requests when checking authorization with
    a high concurrency. The value must be in the range of 1 to 128. The default
    value of 'num_check_access_threads' is 1.
    
    Change-Id: I860e0d18afa0421665f8b3b1c5561d6bdacc5e96
    Reviewed-on: http://gerrit.cloudera.org:8080/14846
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                      |   8 ++
 be/src/util/backend-gflag-util.cc                  |   2 +
 common/thrift/BackendGflags.thrift                 |   2 +
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../java/org/apache/impala/service/Frontend.java   | 124 +++++++++++++++++----
 tests/authorization/test_authorization.py          |  24 ++++
 6 files changed, 144 insertions(+), 20 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 451da40..40841b5 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -300,6 +300,14 @@ DEFINE_int64(impala_slow_rpc_threshold_ms, 2 * 60 * 1000,
     "may result in false positives"
     "This overrides KRPC's --rpc_duration_too_long_ms setting.");
 
+DEFINE_int32(num_check_authorization_threads, 1,
+    "The number of threads used to check authorization for the user when executing show "
+    "tables/databases. This configuration is applicable only when authorization is "
+    "enabled. A value of 1 disables multi-threaded execution for checking authorization."
+    "However, a small value of larger than 1 may limit the parallism of FE requests when "
+    "checking authorization with a high concurrency. The value must be in the range of "
+    "1 to 128.");
+
 // ++========================++
 // || Startup flag graveyard ||
 // ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 6906e1d..467873b 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -83,6 +83,7 @@ DECLARE_bool(unlock_zorder_sort);
 DECLARE_string(blacklisted_tables);
 DECLARE_string(min_privilege_set_for_show_stmts);
 DECLARE_int32(num_expected_executors);
+DECLARE_int32(num_check_authorization_threads);
 
 namespace impala {
 
@@ -169,6 +170,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_blacklisted_tables(FLAGS_blacklisted_tables);
   cfg.__set_min_privilege_set_for_show_stmts(FLAGS_min_privilege_set_for_show_stmts);
   cfg.__set_num_expected_executors(FLAGS_num_expected_executors);
+  cfg.__set_num_check_authorization_threads(FLAGS_num_check_authorization_threads);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3489793..4762654 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -147,4 +147,6 @@ struct TBackendGflags {
   61: required bool mt_dop_auto_fallback
 
   62: required i32 num_expected_executors
+
+  63: required i32 num_check_authorization_threads
 }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 47c4660..d3abf78 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -197,6 +197,10 @@ public class BackendConfig {
     return backendCfg_.min_privilege_set_for_show_stmts;
   }
 
+  public int getNumCheckAuthorizationThreads() {
+    return backendCfg_.num_check_authorization_threads;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 5c36f37..f01d787 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -33,6 +33,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -172,6 +175,7 @@ import com.google.common.base.Predicates;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
@@ -193,6 +197,10 @@ public class Frontend {
   private static final int INCONSISTENT_METADATA_NUM_RETRIES =
       BackendConfig.INSTANCE.getLocalCatalogMaxFetchRetries();
 
+  // Maximum number of threads used to check authorization for the user when executing
+  // show tables/databases.
+  private static final int MAX_CHECK_AUTHORIZATION_POOL_SIZE = 128;
+
   /**
    * Plan-time context that allows capturing various artifacts created
    * during the process.
@@ -271,6 +279,8 @@ public class Frontend {
 
   private final TransactionKeepalive transactionKeepalive_;
 
+  private static ExecutorService checkAuthorizationPool_;
+
   public Frontend(AuthorizationFactory authzFactory) throws ImpalaException {
     this(authzFactory, FeCatalogManager.createFromBackendConfig());
   }
@@ -294,6 +304,15 @@ public class Frontend {
     if (authzConfig.isEnabled()) {
       authzChecker_.set(authzFactory.newAuthorizationChecker(
           getCatalog().getAuthPolicy()));
+      int numThreads = BackendConfig.INSTANCE.getNumCheckAuthorizationThreads();
+      Preconditions.checkState(numThreads > 0
+        && numThreads <= MAX_CHECK_AUTHORIZATION_POOL_SIZE);
+      if (numThreads == 1) {
+        checkAuthorizationPool_ = MoreExecutors.sameThreadExecutor();
+      } else {
+        LOG.info("Using a thread pool of size {} for authorization", numThreads);
+        checkAuthorizationPool_ = Executors.newFixedThreadPool(numThreads);
+      }
     } else {
       authzChecker_.set(authzFactory.newAuthorizationChecker());
     }
@@ -776,6 +795,31 @@ public class Frontend {
   }
 
   /**
+   * A Callable wrapper used for checking authorization to tables/databases.
+   */
+  private class CheckAuthorization implements Callable<Boolean> {
+    private final String dbName_;
+    private final String tblName_;
+    private final String owner_;
+    private final User user_;
+
+    public CheckAuthorization(String dbName, String tblName, String owner, User user) {
+      // dbName and user cannot be null, tblName and owner can be null.
+      Preconditions.checkNotNull(dbName);
+      Preconditions.checkNotNull(user);
+      dbName_ = dbName;
+      tblName_ = tblName;
+      owner_ = owner;
+      user_ = user;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      return new Boolean(isAccessibleToUser(dbName_, tblName_, owner_, user_));
+    }
+  }
+
+  /**
    * Returns all tables in database 'dbName' that match the pattern of 'matcher' and are
    * accessible to 'user'.
    */
@@ -792,11 +836,40 @@ public class Frontend {
     }
   }
 
+  /**
+   * This method filters out elements from the given list based on the the results
+   * of the pendingCheckTasks.
+   */
+  private void filterUnaccessibleElements(List<Future<Boolean>> pendingCheckTasks,
+    List<?> checkList) throws InternalException {
+    int failedCheckTasks = 0;
+    int index = 0;
+    Iterator<?> iter = checkList.iterator();
+
+    Preconditions.checkState(checkList.size() == pendingCheckTasks.size());
+    while (iter.hasNext()) {
+      iter.next();
+      try {
+        if (!pendingCheckTasks.get(index).get()) iter.remove();
+        index++;
+      } catch (ExecutionException | InterruptedException e) {
+        failedCheckTasks++;
+        LOG.error("Encountered an error checking access", e);
+        break;
+      }
+    }
+
+    if (failedCheckTasks > 0)
+      throw new InternalException("Failed to check access." +
+          "Check the server log for more details.");
+  }
+
   private List<String> doGetTableNames(String dbName, PatternMatcher matcher,
       User user) throws ImpalaException {
     FeCatalog catalog = getCatalog();
     List<String> tblNames = catalog.getTableNames(dbName, matcher);
     if (authzFactory_.getAuthorizationConfig().isEnabled()) {
+      List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
       Iterator<String> iter = tblNames.iterator();
       while (iter.hasNext()) {
         String tblName = iter.next();
@@ -811,18 +884,15 @@ public class Frontend {
         String tableOwner = table.getOwnerUser();
         if (tableOwner == null) {
           LOG.info("Table {} not yet loaded, ignoring it in table listing.",
-              dbName + "." + tblName);
-        }
-        Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
-            authzFactory_.getAuthorizableFactory())
-            .anyOf(minPrivilegeSetForShowStmts_)
-            .onAnyColumn(dbName, tblName, tableOwner)
-            .buildSet();
-        if (!authzChecker_.get().hasAnyAccess(user, requests)) {
-          iter.remove();
+            dbName + "." + tblName);
         }
+        pendingCheckTasks.add(checkAuthorizationPool_.submit(
+            new CheckAuthorization(dbName, tblName, tableOwner, user)));
       }
+
+      filterUnaccessibleElements(pendingCheckTasks, tblNames);
     }
+
     return tblNames;
   }
 
@@ -944,29 +1014,43 @@ public class Frontend {
     // have permissions on.
     if (authzFactory_.getAuthorizationConfig().isEnabled()) {
       Iterator<? extends FeDb> iter = dbs.iterator();
+      List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
       while (iter.hasNext()) {
         FeDb db = iter.next();
-        if (!isAccessibleToUser(db, user)) iter.remove();
+        pendingCheckTasks.add(checkAuthorizationPool_.submit(
+            new CheckAuthorization(db.getName(), null, db.getOwnerUser(), user)));
       }
+
+      filterUnaccessibleElements(pendingCheckTasks, dbs);
     }
+
     return dbs;
   }
 
   /**
-   * Check whether database is accessible to given user.
+   * Check whether table/database is accessible to given user.
    */
-  private boolean isAccessibleToUser(FeDb db, User user)
-      throws InternalException {
-    if (db.getName().toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
+  private boolean isAccessibleToUser(String dbName, String tblName,
+      String owner, User user) throws InternalException {
+    Preconditions.checkNotNull(dbName);
+    if (tblName == null &&
+        dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
       // Default DB should always be shown.
       return true;
     }
-    Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
-        authzFactory_.getAuthorizableFactory())
-        .anyOf(minPrivilegeSetForShowStmts_)
-        .onAnyColumn(db.getName(), db.getOwnerUser())
-        .buildSet();
-    return authzChecker_.get().hasAnyAccess(user, requests);
+
+    PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder(
+      authzFactory_.getAuthorizableFactory())
+      .anyOf(minPrivilegeSetForShowStmts_);
+    if (tblName == null) {
+      // Check database
+      builder = builder.onAnyColumn(dbName, owner);
+    } else {
+      // Check table
+      builder = builder.onAnyColumn(dbName, tblName, owner);
+    }
+
+    return authzChecker_.get().hasAnyAccess(user, builder.buildSet());
   }
 
   /**
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index 885d184..7f57ddb 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -22,6 +22,7 @@ import pytest
 import tempfile
 import grp
 import re
+import random
 import sys
 import subprocess
 import urllib
@@ -651,3 +652,26 @@ class TestAuthorization(CustomClusterTestSuite):
                   "--ranger_app_id=impala --authorization_provider=ranger")
   def test_ranger_show_stmts_with_any(self, unique_name):
     self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--server-name=server1 --ranger_service_type=hive "
+                 "--ranger_app_id=impala --authorization_provider=ranger "
+                 "--num_check_authorization_threads=%d" % (random.randint(2, 128)),
+    catalogd_args="--server-name=server1 --ranger_service_type=hive "
+                  "--ranger_app_id=impala --authorization_provider=ranger")
+  def test_num_check_authorization_threads_with_ranger(self, unique_name):
+    self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--server_name=server1 --sentry_config=%s "
+                 "--authorized_proxy_user_config=%s=* "
+                 "--num_check_authorization_threads=%d" %
+                 (SENTRY_CONFIG_FILE, getuser(), random.randint(2, 128)),
+    catalogd_args="--sentry_config={0}".format(SENTRY_CONFIG_FILE),
+    sentry_config=SENTRY_CONFIG_FILE_OO,  # Enable Sentry Object Ownership
+    sentry_log_dir="{0}/test_num_check_authorization_threads_with_sentry"
+                   .format(SENTRY_BASE_LOG_DIR))
+  def test_num_check_authorization_threads_with_sentry(self, unique_role, unique_name):
+    self._test_sentry_show_stmts_helper(unique_role, unique_name, PRIVILEGES)