You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/05/01 15:46:38 UTC
[impala] 04/04: IMPALA-8293 (Part 2): Add support for Ranger cache
invalidation
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit eb97c746d2309fcf78ff3b50751cd5e27101539a
Author: Fredy Wijaya <fw...@cloudera.com>
AuthorDate: Fri Apr 26 15:39:16 2019 -0500
IMPALA-8293 (Part 2): Add support for Ranger cache invalidation
This patch adds support for Ranger cache invalidation via INVALIDATE
METADATA and REFRESH AUTHORIZATION. This patch introduces a new catalog
object type called AUTHZ_CACHE_INVALIDATION to allow broadcasting
messages from Catalogd to Impalads to update their local Ranger caches.
For better user experience, every GRANT/REVOKE statement will invalidate
the Ranger authorization cache.
Testing:
- Replaced the sleep in test_ranger.py with INVALIDATE METADATA or
REFRESH AUTHORIZATION
- Ran all FE tests
- Ran all E2E authorization tests
Change-Id: Ia7160c082298e0b8cc2742dd3facbd4978581288
Reviewed-on: http://gerrit.cloudera.org:8080/13134
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/catalog/catalog-util.cc | 6 ++
common/thrift/CatalogObjects.thrift | 13 +++
.../impala/authorization/AuthorizationChecker.java | 5 +
.../authorization/NoopAuthorizationFactory.java | 3 +
.../ranger/RangerAuthorizationChecker.java | 16 ++++
.../ranger/RangerAuthorizationFactory.java | 2 +-
.../ranger/RangerCatalogdAuthorizationManager.java | 33 ++++++-
.../sentry/SentryAuthorizationChecker.java | 5 +
.../impala/catalog/AuthzCacheInvalidation.java | 53 +++++++++++
.../java/org/apache/impala/catalog/Catalog.java | 27 ++++++
.../impala/catalog/CatalogServiceCatalog.java | 68 ++++++++++++++
.../org/apache/impala/catalog/ImpaladCatalog.java | 31 ++++++-
.../impala/catalog/local/CatalogdMetaProvider.java | 29 +++++-
.../apache/impala/service/FeCatalogManager.java | 12 ++-
.../java/org/apache/impala/service/Frontend.java | 1 +
.../impala/analysis/AuthorizationStmtTest.java | 2 +-
.../org/apache/impala/common/FrontendTestBase.java | 3 +
.../apache/impala/testutil/ImpaladTestCatalog.java | 4 +-
fe/src/test/resources/ranger-hive-security.xml | 2 +-
tests/authorization/test_ranger.py | 103 +++++++++------------
20 files changed, 349 insertions(+), 69 deletions(-)
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index d2b027b..4e628d7 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -266,6 +266,12 @@ Status TCatalogObjectFromObjectName(const TCatalogObjectType::type& object_type,
}
break;
}
+ case TCatalogObjectType::AUTHZ_CACHE_INVALIDATION: {
+ catalog_object->__set_type(object_type);
+ catalog_object->__set_authz_cache_invalidation(TAuthzCacheInvalidation());
+ catalog_object->authz_cache_invalidation.__set_marker_name(object_name);
+ break;
+ }
case TCatalogObjectType::CATALOG:
case TCatalogObjectType::UNKNOWN:
default:
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index f8cb8d5..4682fb7 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -39,6 +39,8 @@ enum TCatalogObjectType {
PRINCIPAL = 7
PRIVILEGE = 8
HDFS_CACHE_POOL = 9
+ // A catalog object type as a marker for authorization cache invalidation.
+ AUTHZ_CACHE_INVALIDATION = 10
}
enum TTableType {
@@ -586,6 +588,14 @@ struct THdfsCachePool {
// the pool limits, pool owner, etc.
}
+// Thrift representation of an TAuthzCacheInvalidation. This catalog object does not
+// contain any authorization data and it's used as marker to perform an authorization
+// cache invalidation.
+struct TAuthzCacheInvalidation {
+ // Name of the authorization cache marker.
+ 1: required string marker_name
+}
+
// Represents state associated with the overall catalog.
struct TCatalog {
// The CatalogService service ID.
@@ -623,4 +633,7 @@ struct TCatalogObject {
// Set iff object type is HDFS_CACHE_POOL
10: optional THdfsCachePool cache_pool
+
+ // Set iff object type is AUTHZ_CACHE_INVALIDATION
+ 11: optional TAuthzCacheInvalidation authz_cache_invalidation
}
diff --git a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
index d4b205a..7d643cb 100644
--- a/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/AuthorizationChecker.java
@@ -127,4 +127,9 @@ public abstract class AuthorizationChecker {
public abstract void authorizeRowFilterAndColumnMask(User user,
List<PrivilegeRequest> privilegeRequests)
throws AuthorizationException, InternalException;
+
+ /**
+ * Invalidates an authorization cache.
+ */
+ public abstract void invalidateAuthorizationCache();
}
diff --git a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
index 7284947..ed77fe1 100644
--- a/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/NoopAuthorizationFactory.java
@@ -199,6 +199,9 @@ public class NoopAuthorizationFactory implements AuthorizationFactory {
List<PrivilegeRequest> privilegeRequests)
throws AuthorizationException, InternalException {
}
+
+ @Override
+ public void invalidateAuthorizationCache() {}
};
}
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
index 561e6c2..1a0f2df 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationChecker.java
@@ -36,6 +36,8 @@ import org.apache.ranger.plugin.policyengine.RangerAccessRequestImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResourceImpl;
import org.apache.ranger.plugin.policyengine.RangerAccessResult;
import org.apache.ranger.plugin.policyengine.RangerPolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -50,6 +52,9 @@ import java.util.Set;
* Ranger plugin uses its own cache.
*/
public class RangerAuthorizationChecker extends AuthorizationChecker {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ RangerAuthorizationChecker.class);
+
// These are Ranger access types (privileges).
public static final String UPDATE_ACCESS_TYPE = "update";
public static final String REFRESH_ACCESS_TYPE = "read";
@@ -174,6 +179,17 @@ public class RangerAuthorizationChecker extends AuthorizationChecker {
}
}
+ @Override
+ public void invalidateAuthorizationCache() {
+ long startTime = System.currentTimeMillis();
+ try {
+ plugin_.refreshPoliciesAndTags();
+ } finally {
+ LOG.debug("Refreshing Ranger policies took {} ms",
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+
/**
* This method checks if column mask is enabled on the given columns and deny access
* when column mask is enabled by throwing an {@link AuthorizationException}. This is
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
index f7248cc..13eac99 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerAuthorizationFactory.java
@@ -90,6 +90,6 @@ public class RangerAuthorizationFactory implements AuthorizationFactory {
RangerImpalaPlugin plugin = new RangerImpalaPlugin(config.getServiceType(),
config.getAppId());
plugin.init();
- return new RangerCatalogdAuthorizationManager(() -> plugin);
+ return new RangerCatalogdAuthorizationManager(() -> plugin, catalog);
}
}
diff --git a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerCatalogdAuthorizationManager.java b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerCatalogdAuthorizationManager.java
index f772c00..87ca927 100644
--- a/fe/src/main/java/org/apache/impala/authorization/ranger/RangerCatalogdAuthorizationManager.java
+++ b/fe/src/main/java/org/apache/impala/authorization/ranger/RangerCatalogdAuthorizationManager.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.User;
+import org.apache.impala.catalog.AuthzCacheInvalidation;
+import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.thrift.TCreateDropRoleParams;
@@ -55,12 +57,17 @@ import java.util.function.Supplier;
* Operations not supported by Ranger will throw an {@link UnsupportedOperationException}.
*/
public class RangerCatalogdAuthorizationManager implements AuthorizationManager {
+ private static final String AUTHZ_CACHE_INVALIDATION_MARKER = "ranger";
+
private final RangerDefaultAuditHandler auditHandler_;
private final Supplier<RangerImpalaPlugin> plugin_;
+ private final CatalogServiceCatalog catalog_;
- public RangerCatalogdAuthorizationManager(Supplier<RangerImpalaPlugin> pluginSupplier) {
+ public RangerCatalogdAuthorizationManager(Supplier<RangerImpalaPlugin> pluginSupplier,
+ CatalogServiceCatalog catalog) {
auditHandler_ = new RangerDefaultAuditHandler();
plugin_ = pluginSupplier;
+ catalog_ = catalog;
}
@Override
@@ -119,6 +126,7 @@ public class RangerCatalogdAuthorizationManager implements AuthorizationManager
plugin_.get().getClusterName(), params.getPrivileges());
grantPrivilege(requests);
+ refreshAuthorization(response);
}
@Override
@@ -129,6 +137,7 @@ public class RangerCatalogdAuthorizationManager implements AuthorizationManager
plugin_.get().getClusterName(), params.getPrivileges());
revokePrivilege(requests);
+ refreshAuthorization(response);
}
@Override
@@ -140,6 +149,7 @@ public class RangerCatalogdAuthorizationManager implements AuthorizationManager
plugin_.get().getClusterName(), params.getPrivileges());
grantPrivilege(requests);
+ refreshAuthorization(response);
}
@Override
@@ -151,6 +161,9 @@ public class RangerCatalogdAuthorizationManager implements AuthorizationManager
plugin_.get().getClusterName(), params.getPrivileges());
revokePrivilege(requests);
+ // Update the authorization refresh marker so that the Impalads can refresh their
+ // Ranger caches.
+ refreshAuthorization(response);
}
@VisibleForTesting
@@ -196,8 +209,22 @@ public class RangerCatalogdAuthorizationManager implements AuthorizationManager
@Override
public AuthorizationDelta refreshAuthorization(boolean resetVersions) {
- // TODO: IMPALA-8293 (part 2)
- return new AuthorizationDelta();
+ // Add a single AUTHZ_CACHE_INVALIDATION catalog object called "ranger" and increment
+ // its version to indicate a new cache invalidation request.
+ AuthorizationDelta authzDelta = new AuthorizationDelta();
+ AuthzCacheInvalidation authzCacheInvalidation =
+ catalog_.incrementAuthzCacheInvalidationVersion(AUTHZ_CACHE_INVALIDATION_MARKER);
+ authzDelta.addCatalogObjectAdded(authzCacheInvalidation.toTCatalogObject());
+ return authzDelta;
+ }
+
+ private void refreshAuthorization(TDdlExecResponse response) {
+ // Update the authorization cache invalidation marker so that the Impalads can
+ // invalidate their Ranger caches. This is needed for usability reason to make sure
+ // what's updated in Ranger via grant/revoke is automatically reflected to the
+ // Impalad Ranger plugins.
+ AuthorizationDelta authzDelta = refreshAuthorization(false);
+ response.result.setUpdated_catalog_objects(authzDelta.getCatalogObjectsAdded());
}
public static List<GrantRevokeRequest> createGrantRevokeRequests(String grantor,
diff --git a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
index 209b7ca..c570600 100644
--- a/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
+++ b/fe/src/main/java/org/apache/impala/authorization/sentry/SentryAuthorizationChecker.java
@@ -80,6 +80,11 @@ public class SentryAuthorizationChecker extends AuthorizationChecker {
throws AuthorizationException, InternalException {
}
+ @Override
+ public void invalidateAuthorizationCache() {
+ // Authorization refresh in Sentry is done by updating {@link AuthorizationPolicy}.
+ }
+
/*
* Creates a new ResourceAuthorizationProvider based on the given configuration.
*/
diff --git a/fe/src/main/java/org/apache/impala/catalog/AuthzCacheInvalidation.java b/fe/src/main/java/org/apache/impala/catalog/AuthzCacheInvalidation.java
new file mode 100644
index 0000000..62b35a0
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/AuthzCacheInvalidation.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import org.apache.impala.thrift.TAuthzCacheInvalidation;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+
+/**
+ * This catalog object is used for authorization cache invalidation notification.
+ */
+public class AuthzCacheInvalidation extends CatalogObjectImpl {
+ private final TAuthzCacheInvalidation authzCacheInvalidation_;
+
+ public AuthzCacheInvalidation(String markerName) {
+ this(new TAuthzCacheInvalidation(markerName));
+ }
+
+ public AuthzCacheInvalidation(TAuthzCacheInvalidation authzCacheInvalidation) {
+ authzCacheInvalidation_ = Preconditions.checkNotNull(authzCacheInvalidation);
+ }
+
+ @Override
+ protected void setTCatalogObject(TCatalogObject catalogObject) {
+ catalogObject.setAuthz_cache_invalidation(authzCacheInvalidation_);
+ }
+
+ @Override
+ public TCatalogObjectType getCatalogObjectType() {
+ return TCatalogObjectType.AUTHZ_CACHE_INVALIDATION;
+ }
+
+ @Override
+ public String getName() { return authzCacheInvalidation_.getMarker_name(); }
+
+ public TAuthzCacheInvalidation toThrift() { return authzCacheInvalidation_; }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index c61e20a..2313d65 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -84,6 +84,10 @@ public abstract class Catalog implements AutoCloseable {
protected final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ =
new CatalogObjectCache<HdfsCachePool>(false);
+ // Cache of authorization cache invalidation markers.
+ protected final CatalogObjectCache<AuthzCacheInvalidation> authzCacheInvalidation_ =
+ new CatalogObjectCache<>();
+
/**
* Creates a new instance of Catalog backed by a given MetaStoreClientPool.
*/
@@ -320,6 +324,13 @@ public abstract class Catalog implements AutoCloseable {
}
/**
+ * Gets the {@link AuthzCacheInvalidation} for a given marker name.
+ */
+ public AuthzCacheInvalidation getAuthzCacheInvalidation(String markerName) {
+ return authzCacheInvalidation_.get(Preconditions.checkNotNull(markerName));
+ }
+
+ /**
* Release the Hive Meta Store Client resources. Can be called multiple times
* (additional calls will be no-ops).
*/
@@ -533,6 +544,19 @@ public abstract class Catalog implements AutoCloseable {
throw new CatalogException(String.format("%s '%s' does not contain " +
"privilege: '%s'", Principal.toString(tmpPrincipal.getPrincipalType()),
tmpPrincipal.getName(), privilegeName));
+ case AUTHZ_CACHE_INVALIDATION:
+ AuthzCacheInvalidation authzCacheInvalidation = getAuthzCacheInvalidation(
+ objectDesc.getAuthz_cache_invalidation().getMarker_name());
+ if (authzCacheInvalidation == null) {
+ // Authorization cache invalidation requires a single catalog object and it
+ // needs to exist.
+ throw new CatalogException("Authz cache invalidation not found: " +
+ objectDesc.getAuthz_cache_invalidation().getMarker_name());
+ }
+ result.setType(authzCacheInvalidation.getCatalogObjectType());
+ result.setCatalog_version(authzCacheInvalidation.getCatalogVersion());
+ result.setAuthz_cache_invalidation(authzCacheInvalidation.toThrift());
+ break;
default: throw new IllegalStateException(
"Unexpected TCatalogObject type: " + objectDesc.getType());
}
@@ -587,6 +611,9 @@ public abstract class Catalog implements AutoCloseable {
catalogObject.getCache_pool().getPool_name().toLowerCase();
case DATA_SOURCE:
return "DATA_SOURCE:" + catalogObject.getData_source().getName().toLowerCase();
+ case AUTHZ_CACHE_INVALIDATION:
+ return "AUTHZ_CACHE_INVALIDATION:" + catalogObject.getAuthz_cache_invalidation()
+ .getMarker_name().toLowerCase();
case CATALOG:
return "CATALOG_SERVICE_ID";
default:
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 62f4b50..5f74c45 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -638,6 +638,7 @@ public class CatalogServiceCatalog extends Catalog {
return obj;
case PRIVILEGE:
case PRINCIPAL:
+ case AUTHZ_CACHE_INVALIDATION:
// The caching of this data on the impalad side is somewhat complex
// and this code is also under some churn at the moment. So, we'll just publish
// the full information rather than doing fetch-on-demand.
@@ -687,6 +688,9 @@ public class CatalogServiceCatalog extends Catalog {
for (User user: getAllUsers()) {
addPrincipalToCatalogDelta(user, ctx);
}
+ for (AuthzCacheInvalidation authzCacheInvalidation: getAllAuthzCacheInvalidation()) {
+ addAuthzCacheInvalidationToCatalogDelta(authzCacheInvalidation, ctx);
+ }
// Identify the catalog objects that were removed from the catalog for which their
// versions are in range ('ctx.fromVersion', 'ctx.toVersion']. We need to make sure
// that we don't include "deleted" objects that were re-added to the catalog.
@@ -895,6 +899,18 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
+ * Get a snapshot view of all authz cache invalidation markers in the catalog.
+ */
+ private List<AuthzCacheInvalidation> getAllAuthzCacheInvalidation() {
+ versionLock_.readLock().lock();
+ try {
+ return ImmutableList.copyOf(authzCacheInvalidation_);
+ } finally {
+ versionLock_.readLock().unlock();
+ }
+ }
+
+ /**
* Adds a database in the topic update if its version is in the range
* ('ctx.fromVersion', 'ctx.toVersion']. It iterates through all the tables and
* functions of this database to determine if they can be included in the topic update.
@@ -1181,6 +1197,22 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
+ * Adds an authz cache invalidation to the topic update if its version is in the range
+ * ('ctx.fromVersion', 'ctx.toVersion'].
+ */
+ private void addAuthzCacheInvalidationToCatalogDelta(
+ AuthzCacheInvalidation authzCacheInvalidation, GetCatalogDeltaContext ctx)
+ throws TException {
+ long authzCacheInvalidationVersion = authzCacheInvalidation.getCatalogVersion();
+ if (authzCacheInvalidationVersion <= ctx.fromVersion ||
+ authzCacheInvalidationVersion > ctx.toVersion) return;
+ TCatalogObject catalogObj = new TCatalogObject(
+ TCatalogObjectType.AUTHZ_CACHE_INVALIDATION, authzCacheInvalidationVersion);
+ catalogObj.setAuthz_cache_invalidation(authzCacheInvalidation.toThrift());
+ ctx.addCatalogObject(catalogObj, false);
+ }
+
+ /**
* Returns all user defined functions (aggregate and scalar) in the specified database.
* Functions are not returned in a defined order.
*/
@@ -2368,6 +2400,42 @@ public class CatalogServiceCatalog extends Catalog {
}
}
+ @Override
+ public AuthzCacheInvalidation getAuthzCacheInvalidation(String markerName) {
+ versionLock_.readLock().lock();
+ try {
+ return authzCacheInvalidation_.get(markerName);
+ } finally {
+ versionLock_.readLock().unlock();;
+ }
+ }
+
+ /**
+ * Gets the {@link AuthzCacheInvalidation} for a given marker name or creates a new
+ * {@link AuthzCacheInvalidation} if it does not exist and increment the catalog
+ * version of {@link AuthzCacheInvalidation}. A catalog version update indicates a
+ * an authorization cache invalidation notification.
+ *
+ * @param markerName the authorization cache invalidation marker name
+ * @return the updated {@link AuthzCacheInvalidation} instance
+ */
+ public AuthzCacheInvalidation incrementAuthzCacheInvalidationVersion(
+ String markerName) {
+ versionLock_.writeLock().lock();
+ try {
+ AuthzCacheInvalidation authzCacheInvalidation = getAuthzCacheInvalidation(
+ markerName);
+ if (authzCacheInvalidation == null) {
+ authzCacheInvalidation = new AuthzCacheInvalidation(markerName);
+ authzCacheInvalidation_.add(authzCacheInvalidation);
+ }
+ authzCacheInvalidation.setCatalogVersion(incrementAndGetCatalogVersion());
+ return authzCacheInvalidation;
+ } finally {
+ versionLock_.writeLock().unlock();
+ }
+ }
+
/**
* Increments the current Catalog version and returns the new value.
*/
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 4c1f975..13cb620 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -21,12 +21,15 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.impala.analysis.TableName;
+import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TAuthzCacheInvalidation;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TDataSource;
@@ -92,9 +95,12 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
// The addresses of the Kudu masters to use if no Kudu masters were explicitly provided.
// Used during table creation.
private final String defaultKuduMasterHosts_;
+ private final AtomicReference<? extends AuthorizationChecker> authzChecker_;
- public ImpaladCatalog(String defaultKuduMasterHosts) {
+ public ImpaladCatalog(String defaultKuduMasterHosts,
+ AtomicReference<? extends AuthorizationChecker> authzChecker) {
super();
+ authzChecker_ = authzChecker;
addDb(BuiltinsDb.getInstance());
defaultKuduMasterHosts_ = defaultKuduMasterHosts;
// Ensure the contents of the CatalogObjectVersionSet instance are cleared when a
@@ -142,7 +148,8 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
return catalogObject.getType() == TCatalogObjectType.DATABASE ||
catalogObject.getType() == TCatalogObjectType.DATA_SOURCE ||
catalogObject.getType() == TCatalogObjectType.HDFS_CACHE_POOL ||
- catalogObject.getType() == TCatalogObjectType.PRINCIPAL;
+ catalogObject.getType() == TCatalogObjectType.PRINCIPAL ||
+ catalogObject.getType() == TCatalogObjectType.AUTHZ_CACHE_INVALIDATION;
}
}
@@ -312,6 +319,13 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
cachePool.setCatalogVersion(catalogObject.getCatalog_version());
hdfsCachePools_.add(cachePool);
break;
+ case AUTHZ_CACHE_INVALIDATION:
+ AuthzCacheInvalidation authzCacheInvalidation = new AuthzCacheInvalidation(
+ catalogObject.getAuthz_cache_invalidation());
+ authzCacheInvalidation.setCatalogVersion(catalogObject.getCatalog_version());
+ authzCacheInvalidation_.add(authzCacheInvalidation);
+ authzChecker_.get().invalidateAuthorizationCache();
+ break;
default:
throw new IllegalStateException(
"Unexpected TCatalogObjectType: " + catalogObject.getType());
@@ -354,6 +368,10 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
hdfsCachePools_.remove(catalogObject.getCache_pool().getPool_name());
}
break;
+ case AUTHZ_CACHE_INVALIDATION:
+ removeAuthzCacheInvalidation(catalogObject.getAuthz_cache_invalidation(),
+ dropCatalogVersion);
+ break;
default:
throw new IllegalStateException(
"Unexpected TCatalogObjectType: " + catalogObject.getType());
@@ -477,6 +495,15 @@ public class ImpaladCatalog extends Catalog implements FeCatalog {
}
}
+ private void removeAuthzCacheInvalidation(
+ TAuthzCacheInvalidation authzCacheInvalidation, long dropCatalogVersion) {
+ AuthzCacheInvalidation existingItem = authzCacheInvalidation_.get(
+ authzCacheInvalidation.getMarker_name());
+ if (existingItem != null && existingItem.getCatalogVersion() < dropCatalogVersion) {
+ authzCacheInvalidation_.remove(authzCacheInvalidation.getMarker_name());
+ }
+ }
+
@Override // FeCatalog
public boolean isReady() {
return lastSyncedCatalogVersion_.get() > INITIAL_CATALOG_VERSION;
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 368729d..a15d889 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -37,10 +38,13 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.AuthzCacheInvalidation;
import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.CatalogDeltaLog;
import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogObjectCache;
import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
@@ -264,6 +268,10 @@ public class CatalogdMetaProvider implements MetaProvider {
* StateStore. Currently this is _not_ "fetch-on-demand".
*/
private final AuthorizationPolicy authPolicy_ = new AuthorizationPolicy();
+ // Cache of authorization refresh markers.
+ private final CatalogObjectCache<AuthzCacheInvalidation> authzCacheInvalidation_ =
+ new CatalogObjectCache<>();
+ private AtomicReference<? extends AuthorizationChecker> authzChecker_;
public CatalogdMetaProvider(TBackendGflags flags) {
Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s());
@@ -305,6 +313,11 @@ public class CatalogdMetaProvider implements MetaProvider {
return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION;
}
+ public void setAuthzChecker(
+ AtomicReference<? extends AuthorizationChecker> authzChecker) {
+ authzChecker_ = authzChecker;
+ }
+
/**
* Send a GetPartialCatalogObject request to catalogd. This handles converting
* non-OK status responses back to exceptions, performing various generic sanity
@@ -909,7 +922,8 @@ public class CatalogdMetaProvider implements MetaProvider {
// may be cross-referential. So, just add them to the sequencer which ensures
// we handle them in the right order later.
if (obj.type == TCatalogObjectType.PRINCIPAL ||
- obj.type == TCatalogObjectType.PRIVILEGE) {
+ obj.type == TCatalogObjectType.PRIVILEGE ||
+ obj.type == TCatalogObjectType.AUTHZ_CACHE_INVALIDATION) {
authObjectSequencer.add(obj, isDelete);
}
@@ -1001,6 +1015,19 @@ public class CatalogdMetaProvider implements MetaProvider {
obj.getCatalog_version());
}
break;
+ case AUTHZ_CACHE_INVALIDATION:
+ if (!isDelete) {
+ AuthzCacheInvalidation authzCacheInvalidation = new AuthzCacheInvalidation(
+ obj.getAuthz_cache_invalidation());
+ authzCacheInvalidation.setCatalogVersion(obj.getCatalog_version());
+ authzCacheInvalidation_.add(authzCacheInvalidation);
+ Preconditions.checkState(authzChecker_ != null);
+ authzChecker_.get().invalidateAuthorizationCache();
+ } else {
+ authzCacheInvalidation_.remove(obj.getAuthz_cache_invalidation()
+ .getMarker_name());
+ }
+ break;
default:
throw new IllegalArgumentException("invalid type: " + obj.type);
}
diff --git a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
index 75e7e16..ae7c471 100644
--- a/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
+++ b/fe/src/main/java/org/apache/impala/service/FeCatalogManager.java
@@ -18,6 +18,8 @@ package org.apache.impala.service;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Preconditions;
+import org.apache.impala.authorization.AuthorizationChecker;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeCatalog;
import org.apache.impala.catalog.ImpaladCatalog;
@@ -39,6 +41,8 @@ public abstract class FeCatalogManager {
private static String DEFAULT_KUDU_MASTER_HOSTS =
BackendConfig.INSTANCE.getBackendCfg().kudu_master_hosts;
+ protected AtomicReference<? extends AuthorizationChecker> authzChecker_;
+
/**
* @return the appropriate implementation based on the current backend
* configuration.
@@ -59,6 +63,11 @@ public abstract class FeCatalogManager {
return new TestImpl(testCatalog);
}
+ public void setAuthzChecker(
+ AtomicReference<? extends AuthorizationChecker> authzChecker) {
+ authzChecker_ = Preconditions.checkNotNull(authzChecker);
+ }
+
/**
* @return a Catalog instance to be used for a request or query. Depending
* on the catalog implementation this may either be a reused instance or a
@@ -119,7 +128,7 @@ public abstract class FeCatalogManager {
}
private ImpaladCatalog createNewCatalog() {
- return new ImpaladCatalog(DEFAULT_KUDU_MASTER_HOSTS);
+ return new ImpaladCatalog(DEFAULT_KUDU_MASTER_HOSTS, authzChecker_);
}
}
@@ -133,6 +142,7 @@ public abstract class FeCatalogManager {
@Override
public FeCatalog getOrCreateCatalog() {
+ PROVIDER.setAuthzChecker(authzChecker_);
return new LocalCatalog(PROVIDER, DEFAULT_KUDU_MASTER_HOSTS);
}
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 16f1580..cb9f9ab 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -263,6 +263,7 @@ public class Frontend {
} else {
authzChecker_.set(authzFactory.newAuthorizationChecker());
}
+ catalogManager_.setAuthzChecker(authzChecker_);
authzManager_ = authzFactory.newAuthorizationManager(catalogManager_,
authzChecker_::get);
impaladTableUsageTracker_ = ImpaladTableUsageTracker.createFromConfig(
diff --git a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
index 0a9e64a..df543cf 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AuthorizationStmtTest.java
@@ -3276,7 +3276,7 @@ public class AuthorizationStmtTest extends FrontendTestBase {
private abstract class WithRanger implements WithPrincipal {
private final List<GrantRevokeRequest> requests = new ArrayList<>();
private final RangerCatalogdAuthorizationManager authzManager =
- new RangerCatalogdAuthorizationManager(() -> rangerImpalaPlugin_);
+ new RangerCatalogdAuthorizationManager(() -> rangerImpalaPlugin_, null);
@Override
public void init(TPrivilege[]... privileges) throws ImpalaException {
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 59cd604..8e95f3c 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -350,6 +350,9 @@ public class FrontendTestBase extends AbstractFrontendTest {
List<PrivilegeRequest> privilegeRequests)
throws AuthorizationException, InternalException {
}
+
+ @Override
+ public void invalidateAuthorizationCache() {}
};
}
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index 45cbed6..81bb9bd 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -55,7 +55,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
* Takes an {@link AuthorizationFactory} to bootstrap the backing CatalogServiceCatalog.
*/
public ImpaladTestCatalog(AuthorizationFactory authzFactory) {
- super("127.0.0.1");
+ super("127.0.0.1", null);
CatalogServiceCatalog catalogServerCatalog =
CatalogServiceTestCatalog.createWithAuth(authzFactory);
authPolicy_ = catalogServerCatalog.getAuthPolicy();
@@ -68,7 +68,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
* Creates ImpaladTestCatalog backed by a given catalog instance.
*/
public ImpaladTestCatalog(CatalogServiceCatalog catalog) {
- super("127.0.0.1");
+ super("127.0.0.1", null);
srcCatalog_ = Preconditions.checkNotNull(catalog);
authPolicy_ = srcCatalog_.getAuthPolicy();
setIsReady(true);
diff --git a/fe/src/test/resources/ranger-hive-security.xml b/fe/src/test/resources/ranger-hive-security.xml
index 4cc3160..4fbda3e 100644
--- a/fe/src/test/resources/ranger-hive-security.xml
+++ b/fe/src/test/resources/ranger-hive-security.xml
@@ -44,7 +44,7 @@
<property>
<name>ranger.plugin.hive.policy.pollIntervalMs</name>
- <value>5000</value>
+ <value>30000</value>
<description>
Polling interval in milliseconds to poll for changes in policies.
</description>
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index f2900b0..b1e62c1 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -21,7 +21,6 @@ import grp
import json
import pytest
import requests
-import time
from getpass import getuser
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
@@ -45,7 +44,8 @@ class TestRanger(CustomClusterTestSuite):
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
def test_grant_revoke_with_catalog_v1(self, unique_name):
"""Tests grant/revoke with catalog v1."""
- self._test_grant_revoke(unique_name)
+ self._test_grant_revoke(unique_name, [None, "invalidate metadata",
+ "refresh authorization"])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -55,9 +55,10 @@ class TestRanger(CustomClusterTestSuite):
"--catalog_topic_mode=minimal"))
def test_grant_revoke_with_local_catalog(self, unique_name):
"""Tests grant/revoke with catalog v2 (local catalog)."""
- self._test_grant_revoke(unique_name)
+ # Catalog v2 does not support global invalidate metadata.
+ self._test_grant_revoke(unique_name, [None, "refresh authorization"])
- def _test_grant_revoke(self, unique_name):
+ def _test_grant_revoke(self, unique_name, refresh_statements):
user = getuser()
admin_client = self.create_impala_client()
unique_database = unique_name + "_db"
@@ -65,42 +66,41 @@ class TestRanger(CustomClusterTestSuite):
group = grp.getgrnam(getuser()).gr_name
test_data = [(user, "USER"), (group, "GROUP")]
- for data in test_data:
- ident = data[0]
- kw = data[1]
-
- try:
- # Set-up temp database/table
- admin_client.execute("drop database if exists {0} cascade"
- .format(unique_database), user=ADMIN)
- admin_client.execute("create database {0}".format(unique_database), user=ADMIN)
- admin_client.execute("create table {0}.{1} (x int)"
- .format(unique_database, unique_table), user=ADMIN)
-
- self.execute_query_expect_success(admin_client,
- "grant select on database {0} to {1} {2}"
- .format(unique_database, kw, ident), user=ADMIN)
- # TODO: IMPALA-8293 use refresh authorization
- time.sleep(10)
- result = self.execute_query("show grant {0} {1} on database {2}"
- .format(kw, ident, unique_database))
- TestRanger._check_privileges(result, [
- [kw, ident, unique_database, "", "", "", "*", "select", "false"],
- [kw, ident, unique_database, "*", "*", "", "", "select", "false"]])
- self.execute_query_expect_success(admin_client,
- "revoke select on database {0} from {1} "
- "{2}".format(unique_database, kw, ident),
- user=ADMIN)
- # TODO: IMPALA-8293 use refresh authorization
- time.sleep(10)
- result = self.execute_query("show grant {0} {1} on database {2}"
- .format(kw, ident, unique_database))
- TestRanger._check_privileges(result, [])
- finally:
- admin_client.execute("revoke select on database {0} from {1} {2}"
- .format(unique_database, kw, ident), user=ADMIN)
- admin_client.execute("drop database if exists {0} cascade"
- .format(unique_database), user=ADMIN)
+ for refresh_stmt in refresh_statements:
+ for data in test_data:
+ ident = data[0]
+ kw = data[1]
+ try:
+ # Set-up temp database/table
+ admin_client.execute("drop database if exists {0} cascade"
+ .format(unique_database), user=ADMIN)
+ admin_client.execute("create database {0}".format(unique_database), user=ADMIN)
+ admin_client.execute("create table {0}.{1} (x int)"
+ .format(unique_database, unique_table), user=ADMIN)
+
+ self.execute_query_expect_success(admin_client,
+ "grant select on database {0} to {1} {2}"
+ .format(unique_database, kw, ident),
+ user=ADMIN)
+ self._refresh_authorization(admin_client, refresh_stmt)
+ result = self.execute_query("show grant {0} {1} on database {2}"
+ .format(kw, ident, unique_database))
+ TestRanger._check_privileges(result, [
+ [kw, ident, unique_database, "", "", "", "*", "select", "false"],
+ [kw, ident, unique_database, "*", "*", "", "", "select", "false"]])
+ self.execute_query_expect_success(admin_client,
+ "revoke select on database {0} from {1} "
+ "{2}".format(unique_database, kw, ident),
+ user=ADMIN)
+ self._refresh_authorization(admin_client, refresh_stmt)
+ result = self.execute_query("show grant {0} {1} on database {2}"
+ .format(kw, ident, unique_database))
+ TestRanger._check_privileges(result, [])
+ finally:
+ admin_client.execute("revoke select on database {0} from {1} {2}"
+ .format(unique_database, kw, ident), user=ADMIN)
+ admin_client.execute("drop database if exists {0} cascade"
+ .format(unique_database), user=ADMIN)
@CustomClusterTestSuite.with_args(
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
@@ -123,8 +123,7 @@ class TestRanger(CustomClusterTestSuite):
"grant select on database {0} to user {1} with "
"grant option".format(unique_database, user1),
user=ADMIN)
- # TODO: IMPALA-8293 use refresh authorization
- time.sleep(10)
+
# Verify user 1 has with_grant privilege on unique_database
result = self.execute_query("show grant user {0} on database {1}"
.format(user1, unique_database))
@@ -136,8 +135,7 @@ class TestRanger(CustomClusterTestSuite):
self.execute_query_expect_success(admin_client, "revoke grant option for select "
"on database {0} from user {1}"
.format(unique_database, user1), user=ADMIN)
- # TODO: IMPALA-8293 use refresh authorization
- time.sleep(10)
+
# User 1 can no longer grant privileges on unique_database
result = self.execute_query("show grant user {0} on database {1}"
.format(user1, unique_database))
@@ -196,7 +194,6 @@ class TestRanger(CustomClusterTestSuite):
admin_client.execute("grant select on database {0} to group {1}"
.format(unique_db, group))
- time.sleep(10)
result = self.client.execute("show grant user {0} on database {1}"
.format(user, unique_db))
TestRanger._check_privileges(result, [
@@ -211,7 +208,6 @@ class TestRanger(CustomClusterTestSuite):
try:
for privilege in privileges:
admin_client.execute("grant {0} on server to user {1}".format(privilege, user))
- time.sleep(10)
result = self.client.execute("show grant user {0} on server".format(user))
TestRanger._check_privileges(result, [
["USER", user, "", "", "", "*", "", "alter", "false"],
@@ -231,7 +227,6 @@ class TestRanger(CustomClusterTestSuite):
["USER", user, "*", "*", "*", "", "", "select", "false"]])
admin_client.execute("grant all on server to user {0}".format(user))
- time.sleep(10)
result = self.client.execute("show grant user {0} on server".format(user))
TestRanger._check_privileges(result, [
["USER", user, "", "", "", "*", "", "all", "false"],
@@ -246,7 +241,6 @@ class TestRanger(CustomClusterTestSuite):
try:
# Grant server privileges and verify
admin_client.execute("grant all on server to {0} {1}".format(kw, id), user=ADMIN)
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on server".format(kw, id))
TestRanger._check_privileges(result, [
[kw, id, "", "", "", "*", "", "all", "false"],
@@ -255,14 +249,12 @@ class TestRanger(CustomClusterTestSuite):
# Revoke server privileges and verify
admin_client.execute("revoke all on server from {0} {1}".format(kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on server".format(kw, id))
TestRanger._check_privileges(result, [])
# Grant uri privileges and verify
admin_client.execute("grant all on uri '{0}' to {1} {2}"
.format(uri, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on uri '{2}'"
.format(kw, id, uri))
TestRanger._check_privileges(result, [
@@ -271,7 +263,6 @@ class TestRanger(CustomClusterTestSuite):
# Revoke uri privileges and verify
admin_client.execute("revoke all on uri '{0}' from {1} {2}"
.format(uri, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on uri '{2}'"
.format(kw, id, uri))
TestRanger._check_privileges(result, [])
@@ -279,7 +270,6 @@ class TestRanger(CustomClusterTestSuite):
# Grant database privileges and verify
admin_client.execute("grant select on database {0} to {1} {2}"
.format(unique_database, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on database {2}"
.format(kw, id, unique_database))
TestRanger._check_privileges(result, [
@@ -289,7 +279,6 @@ class TestRanger(CustomClusterTestSuite):
# Revoke database privileges and verify
admin_client.execute("revoke select on database {0} from {1} {2}"
.format(unique_database, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on database {2}"
.format(kw, id, unique_database))
TestRanger._check_privileges(result, [])
@@ -297,7 +286,6 @@ class TestRanger(CustomClusterTestSuite):
# Grant table privileges and verify
admin_client.execute("grant select on table {0}.{1} to {2} {3}"
.format(unique_database, unique_table, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on table {2}.{3}"
.format(kw, id, unique_database, unique_table))
TestRanger._check_privileges(result, [
@@ -306,7 +294,6 @@ class TestRanger(CustomClusterTestSuite):
# Revoke table privileges and verify
admin_client.execute("revoke select on table {0}.{1} from {2} {3}"
.format(unique_database, unique_table, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on table {2}.{3}"
.format(kw, id, unique_database, unique_table))
TestRanger._check_privileges(result, [])
@@ -314,7 +301,6 @@ class TestRanger(CustomClusterTestSuite):
# Grant column privileges and verify
admin_client.execute("grant select(x) on table {0}.{1} to {2} {3}"
.format(unique_database, unique_table, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on column {2}.{3}.x"
.format(kw, id, unique_database, unique_table))
TestRanger._check_privileges(result, [
@@ -323,7 +309,6 @@ class TestRanger(CustomClusterTestSuite):
# Revoke column privileges and verify
admin_client.execute("revoke select(x) on table {0}.{1} from {2} {3}"
.format(unique_database, unique_table, kw, id))
- time.sleep(10)
result = self.client.execute("show grant {0} {1} on column {2}.{3}.x"
.format(kw, id, unique_database, unique_table))
TestRanger._check_privileges(result, [])
@@ -359,3 +344,7 @@ class TestRanger(CustomClusterTestSuite):
return cols[0:len(cols) - 1]
assert map(columns, result.data) == expected
+
+ def _refresh_authorization(self, client, statement):
+ if statement is not None:
+ self.execute_query_expect_success(client, statement)