You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/12/28 02:17:39 UTC
[impala] 01/02: IMPALA-9195: Using multithreaded execution to
accelerate 'show tables/databases'
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 05dfb208ff32d434399cb18219425467b0f9b2a9
Author: xuzhou <pe...@gmail.com>
AuthorDate: Fri Dec 20 17:25:39 2019 +0800
IMPALA-9195: Using multithreaded execution to accelerate 'show tables/databases'
If Sentry authorization is enabled, users with multi group-policies
will take time to get the result of 'show tables/databases'. It seems
that ResourceAuthorizationProvider.hasAccess performs bad for users
with complex group-policies, IMPALA-9242 will target to address this
problem.
This patch provides a config option 'num_check_authorization_threads' to
accelerate 'show tables/databases' by using multithreading. This configuration
is applicable only when authorization is enabled. A value of 1 disables
multi-threaded execution for checking access. However, a small value of larger
than 1 may limit the parallism of FE requests when checking authorization with
a high concurrency. The value must be in the range of 1 to 128. The default
value of 'num_check_access_threads' is 1.
Change-Id: I860e0d18afa0421665f8b3b1c5561d6bdacc5e96
Reviewed-on: http://gerrit.cloudera.org:8080/14846
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/common/global-flags.cc | 8 ++
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../org/apache/impala/service/BackendConfig.java | 4 +
.../java/org/apache/impala/service/Frontend.java | 124 +++++++++++++++++----
tests/authorization/test_authorization.py | 24 ++++
6 files changed, 144 insertions(+), 20 deletions(-)
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 451da40..40841b5 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -300,6 +300,14 @@ DEFINE_int64(impala_slow_rpc_threshold_ms, 2 * 60 * 1000,
"may result in false positives"
"This overrides KRPC's --rpc_duration_too_long_ms setting.");
+DEFINE_int32(num_check_authorization_threads, 1,
+ "The number of threads used to check authorization for the user when executing show "
+ "tables/databases. This configuration is applicable only when authorization is "
+ "enabled. A value of 1 disables multi-threaded execution for checking authorization."
+ "However, a small value of larger than 1 may limit the parallism of FE requests when "
+ "checking authorization with a high concurrency. The value must be in the range of "
+ "1 to 128.");
+
// ++========================++
// || Startup flag graveyard ||
// ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 6906e1d..467873b 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -83,6 +83,7 @@ DECLARE_bool(unlock_zorder_sort);
DECLARE_string(blacklisted_tables);
DECLARE_string(min_privilege_set_for_show_stmts);
DECLARE_int32(num_expected_executors);
+DECLARE_int32(num_check_authorization_threads);
namespace impala {
@@ -169,6 +170,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
cfg.__set_blacklisted_tables(FLAGS_blacklisted_tables);
cfg.__set_min_privilege_set_for_show_stmts(FLAGS_min_privilege_set_for_show_stmts);
cfg.__set_num_expected_executors(FLAGS_num_expected_executors);
+ cfg.__set_num_check_authorization_threads(FLAGS_num_check_authorization_threads);
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 3489793..4762654 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -147,4 +147,6 @@ struct TBackendGflags {
61: required bool mt_dop_auto_fallback
62: required i32 num_expected_executors
+
+ 63: required i32 num_check_authorization_threads
}
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 47c4660..d3abf78 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -197,6 +197,10 @@ public class BackendConfig {
return backendCfg_.min_privilege_set_for_show_stmts;
}
+ public int getNumCheckAuthorizationThreads() {
+ return backendCfg_.num_check_authorization_threads;
+ }
+
// Inits the auth_to_local configuration in the static KerberosName class.
private static void initAuthToLocal() {
// If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 5c36f37..f01d787 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -33,6 +33,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -172,6 +175,7 @@ import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
/**
@@ -193,6 +197,10 @@ public class Frontend {
private static final int INCONSISTENT_METADATA_NUM_RETRIES =
BackendConfig.INSTANCE.getLocalCatalogMaxFetchRetries();
+ // Maximum number of threads used to check authorization for the user when executing
+ // show tables/databases.
+ private static final int MAX_CHECK_AUTHORIZATION_POOL_SIZE = 128;
+
/**
* Plan-time context that allows capturing various artifacts created
* during the process.
@@ -271,6 +279,8 @@ public class Frontend {
private final TransactionKeepalive transactionKeepalive_;
+ private static ExecutorService checkAuthorizationPool_;
+
public Frontend(AuthorizationFactory authzFactory) throws ImpalaException {
this(authzFactory, FeCatalogManager.createFromBackendConfig());
}
@@ -294,6 +304,15 @@ public class Frontend {
if (authzConfig.isEnabled()) {
authzChecker_.set(authzFactory.newAuthorizationChecker(
getCatalog().getAuthPolicy()));
+ int numThreads = BackendConfig.INSTANCE.getNumCheckAuthorizationThreads();
+ Preconditions.checkState(numThreads > 0
+ && numThreads <= MAX_CHECK_AUTHORIZATION_POOL_SIZE);
+ if (numThreads == 1) {
+ checkAuthorizationPool_ = MoreExecutors.sameThreadExecutor();
+ } else {
+ LOG.info("Using a thread pool of size {} for authorization", numThreads);
+ checkAuthorizationPool_ = Executors.newFixedThreadPool(numThreads);
+ }
} else {
authzChecker_.set(authzFactory.newAuthorizationChecker());
}
@@ -776,6 +795,31 @@ public class Frontend {
}
/**
+ * A Callable wrapper used for checking authorization to tables/databases.
+ */
+ private class CheckAuthorization implements Callable<Boolean> {
+ private final String dbName_;
+ private final String tblName_;
+ private final String owner_;
+ private final User user_;
+
+ public CheckAuthorization(String dbName, String tblName, String owner, User user) {
+ // dbName and user cannot be null, tblName and owner can be null.
+ Preconditions.checkNotNull(dbName);
+ Preconditions.checkNotNull(user);
+ dbName_ = dbName;
+ tblName_ = tblName;
+ owner_ = owner;
+ user_ = user;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ return new Boolean(isAccessibleToUser(dbName_, tblName_, owner_, user_));
+ }
+ }
+
+ /**
* Returns all tables in database 'dbName' that match the pattern of 'matcher' and are
* accessible to 'user'.
*/
@@ -792,11 +836,40 @@ public class Frontend {
}
}
+ /**
+ * This method filters out elements from the given list based on the the results
+ * of the pendingCheckTasks.
+ */
+ private void filterUnaccessibleElements(List<Future<Boolean>> pendingCheckTasks,
+ List<?> checkList) throws InternalException {
+ int failedCheckTasks = 0;
+ int index = 0;
+ Iterator<?> iter = checkList.iterator();
+
+ Preconditions.checkState(checkList.size() == pendingCheckTasks.size());
+ while (iter.hasNext()) {
+ iter.next();
+ try {
+ if (!pendingCheckTasks.get(index).get()) iter.remove();
+ index++;
+ } catch (ExecutionException | InterruptedException e) {
+ failedCheckTasks++;
+ LOG.error("Encountered an error checking access", e);
+ break;
+ }
+ }
+
+ if (failedCheckTasks > 0)
+ throw new InternalException("Failed to check access." +
+ "Check the server log for more details.");
+ }
+
private List<String> doGetTableNames(String dbName, PatternMatcher matcher,
User user) throws ImpalaException {
FeCatalog catalog = getCatalog();
List<String> tblNames = catalog.getTableNames(dbName, matcher);
if (authzFactory_.getAuthorizationConfig().isEnabled()) {
+ List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
Iterator<String> iter = tblNames.iterator();
while (iter.hasNext()) {
String tblName = iter.next();
@@ -811,18 +884,15 @@ public class Frontend {
String tableOwner = table.getOwnerUser();
if (tableOwner == null) {
LOG.info("Table {} not yet loaded, ignoring it in table listing.",
- dbName + "." + tblName);
- }
- Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
- authzFactory_.getAuthorizableFactory())
- .anyOf(minPrivilegeSetForShowStmts_)
- .onAnyColumn(dbName, tblName, tableOwner)
- .buildSet();
- if (!authzChecker_.get().hasAnyAccess(user, requests)) {
- iter.remove();
+ dbName + "." + tblName);
}
+ pendingCheckTasks.add(checkAuthorizationPool_.submit(
+ new CheckAuthorization(dbName, tblName, tableOwner, user)));
}
+
+ filterUnaccessibleElements(pendingCheckTasks, tblNames);
}
+
return tblNames;
}
@@ -944,29 +1014,43 @@ public class Frontend {
// have permissions on.
if (authzFactory_.getAuthorizationConfig().isEnabled()) {
Iterator<? extends FeDb> iter = dbs.iterator();
+ List<Future<Boolean>> pendingCheckTasks = Lists.newArrayList();
while (iter.hasNext()) {
FeDb db = iter.next();
- if (!isAccessibleToUser(db, user)) iter.remove();
+ pendingCheckTasks.add(checkAuthorizationPool_.submit(
+ new CheckAuthorization(db.getName(), null, db.getOwnerUser(), user)));
}
+
+ filterUnaccessibleElements(pendingCheckTasks, dbs);
}
+
return dbs;
}
/**
- * Check whether database is accessible to given user.
+ * Check whether table/database is accessible to given user.
*/
- private boolean isAccessibleToUser(FeDb db, User user)
- throws InternalException {
- if (db.getName().toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
+ private boolean isAccessibleToUser(String dbName, String tblName,
+ String owner, User user) throws InternalException {
+ Preconditions.checkNotNull(dbName);
+ if (tblName == null &&
+ dbName.toLowerCase().equals(Catalog.DEFAULT_DB.toLowerCase())) {
// Default DB should always be shown.
return true;
}
- Set<PrivilegeRequest> requests = new PrivilegeRequestBuilder(
- authzFactory_.getAuthorizableFactory())
- .anyOf(minPrivilegeSetForShowStmts_)
- .onAnyColumn(db.getName(), db.getOwnerUser())
- .buildSet();
- return authzChecker_.get().hasAnyAccess(user, requests);
+
+ PrivilegeRequestBuilder builder = new PrivilegeRequestBuilder(
+ authzFactory_.getAuthorizableFactory())
+ .anyOf(minPrivilegeSetForShowStmts_);
+ if (tblName == null) {
+ // Check database
+ builder = builder.onAnyColumn(dbName, owner);
+ } else {
+ // Check table
+ builder = builder.onAnyColumn(dbName, tblName, owner);
+ }
+
+ return authzChecker_.get().hasAnyAccess(user, builder.buildSet());
}
/**
diff --git a/tests/authorization/test_authorization.py b/tests/authorization/test_authorization.py
index 885d184..7f57ddb 100644
--- a/tests/authorization/test_authorization.py
+++ b/tests/authorization/test_authorization.py
@@ -22,6 +22,7 @@ import pytest
import tempfile
import grp
import re
+import random
import sys
import subprocess
import urllib
@@ -651,3 +652,26 @@ class TestAuthorization(CustomClusterTestSuite):
"--ranger_app_id=impala --authorization_provider=ranger")
def test_ranger_show_stmts_with_any(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger "
+ "--num_check_authorization_threads=%d" % (random.randint(2, 128)),
+ catalogd_args="--server-name=server1 --ranger_service_type=hive "
+ "--ranger_app_id=impala --authorization_provider=ranger")
+ def test_num_check_authorization_threads_with_ranger(self, unique_name):
+ self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--server_name=server1 --sentry_config=%s "
+ "--authorized_proxy_user_config=%s=* "
+ "--num_check_authorization_threads=%d" %
+ (SENTRY_CONFIG_FILE, getuser(), random.randint(2, 128)),
+ catalogd_args="--sentry_config={0}".format(SENTRY_CONFIG_FILE),
+ sentry_config=SENTRY_CONFIG_FILE_OO, # Enable Sentry Object Ownership
+ sentry_log_dir="{0}/test_num_check_authorization_threads_with_sentry"
+ .format(SENTRY_BASE_LOG_DIR))
+ def test_num_check_authorization_threads_with_sentry(self, unique_role, unique_name):
+ self._test_sentry_show_stmts_helper(unique_role, unique_name, PRIVILEGES)