You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2020/04/10 18:19:06 UTC

[kudu] branch master updated (14912a1 -> dacb820)

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 14912a1  [catalog_manager] reduce contention in ScopedLeaderSharedLock
     new e7044a5  [ranger] Fix and refactor RangerClient
     new dacb820  KUDU-3078 Add Ranger tests to master_authz-itest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/integration-tests/CMakeLists.txt          |    2 +-
 src/kudu/integration-tests/hms_itest-base.cc       |   64 +-
 src/kudu/integration-tests/hms_itest-base.h        |   44 +-
 ...aster_sentry-itest.cc => master_authz-itest.cc} | 1079 ++++++++++++++------
 src/kudu/integration-tests/master_hms-itest.cc     |  114 ++-
 src/kudu/integration-tests/ts_sentry-itest.cc      |  146 ++-
 src/kudu/master/ranger_authz_provider.cc           |  209 +++-
 src/kudu/ranger/mini_ranger-test.cc                |   19 +-
 src/kudu/ranger/mini_ranger.cc                     |   27 +-
 src/kudu/ranger/mini_ranger.h                      |   21 +-
 src/kudu/ranger/ranger_client-test.cc              |   42 +-
 src/kudu/ranger/ranger_client.cc                   |   94 +-
 src/kudu/ranger/ranger_client.h                    |   40 +-
 13 files changed, 1266 insertions(+), 635 deletions(-)
 rename src/kudu/integration-tests/{master_sentry-itest.cc => master_authz-itest.cc} (56%)


[kudu] 02/02: KUDU-3078 Add Ranger tests to master_authz-itest

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit dacb8203f05ca2d873d524501214bfe5144ac9bb
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Fri Apr 10 17:04:58 2020 +0200

    KUDU-3078 Add Ranger tests to master_authz-itest
    
    This commit refactors the existing Sentry integration tests to prepare
    for Ranger integration tests.
    
    It changes HMS and Sentry integration test base classes to test
    harnesses that don't inherit from Gtest to simplify inheritance when
    using typed tests.
    
    master_sentry-itest is renamed to master_authz-itest to generalize it,
    and most tests are changed to also run with Ranger. Those that aren't
    test behavior specific to Sentry. A later patch will do the same for
    ts_sentry-itest.
    
    Ranger doesn't support adding new policy items (users and privileges) to
    existing policies, so MiniRanger::AddPolicy stores policies in a member
    variable and recreates them completely when a new item is added to an
    existing policy (resource).
    
    Change-Id: I25dc67516cd61f0624914989f8db4c4f94d7e3bf
    Reviewed-on: http://gerrit.cloudera.org:8080/15681
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/integration-tests/CMakeLists.txt          |    2 +-
 src/kudu/integration-tests/hms_itest-base.cc       |   64 +-
 src/kudu/integration-tests/hms_itest-base.h        |   44 +-
 ...aster_sentry-itest.cc => master_authz-itest.cc} | 1079 ++++++++++++++------
 src/kudu/integration-tests/master_hms-itest.cc     |  114 ++-
 src/kudu/integration-tests/ts_sentry-itest.cc      |  146 ++-
 src/kudu/ranger/mini_ranger-test.cc                |   19 +-
 src/kudu/ranger/mini_ranger.cc                     |   27 +-
 src/kudu/ranger/mini_ranger.h                      |   21 +-
 9 files changed, 1047 insertions(+), 469 deletions(-)

diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 4663f0d..ea63c21 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -86,6 +86,7 @@ ADD_KUDU_TEST(linked_list-test RUN_SERIAL true)
 ADD_KUDU_TEST(log-rolling-itest)
 ADD_KUDU_TEST(maintenance_mode-itest NUM_SHARDS 8
   DATA_FILES ../scripts/assign-location.py)
+ADD_KUDU_TEST(master_authz-itest RUN_SERIAL true NUM_SHARDS 8 PROCESSORS 4)
 ADD_KUDU_TEST(master_cert_authority-itest PROCESSORS 2)
 ADD_KUDU_TEST(master_failover-itest NUM_SHARDS 4 PROCESSORS 3)
 ADD_KUDU_TEST_DEPENDENCIES(master_failover-itest
@@ -95,7 +96,6 @@ ADD_KUDU_TEST(master_migration-itest)
 ADD_KUDU_TEST_DEPENDENCIES(master_migration-itest
   kudu)
 ADD_KUDU_TEST(master_replication-itest)
-ADD_KUDU_TEST(master_sentry-itest RUN_SERIAL true NUM_SHARDS 8 PROCESSORS 4)
 ADD_KUDU_TEST(master-stress-test RUN_SERIAL true NUM_SHARDS 3)
 if(${KUDU_TCMALLOC_AVAILABLE})
   ADD_KUDU_TEST(memory_gc-itest)
diff --git a/src/kudu/integration-tests/hms_itest-base.cc b/src/kudu/integration-tests/hms_itest-base.cc
index 1566554..8957fc0 100644
--- a/src/kudu/integration-tests/hms_itest-base.cc
+++ b/src/kudu/integration-tests/hms_itest-base.cc
@@ -41,12 +41,14 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/user.h"
 
+using kudu::client::KuduClient;
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaBuilder;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalMiniCluster;
 using kudu::hms::HmsClient;
 using std::string;
 using std::unique_ptr;
@@ -54,17 +56,17 @@ using strings::Substitute;
 
 namespace kudu {
 
-Status HmsITestBase::StopHms() {
+Status HmsITestHarness::StopHms(const unique_ptr<cluster::ExternalMiniCluster>& cluster) {
   RETURN_NOT_OK(hms_client_->Stop());
-  return cluster_->hms()->Stop();
+  return cluster->hms()->Stop();
 }
 
-Status HmsITestBase::StartHms() {
-  RETURN_NOT_OK(cluster_->hms()->Start());
+Status HmsITestHarness::StartHms(const unique_ptr<cluster::ExternalMiniCluster>& cluster) {
+  RETURN_NOT_OK(cluster->hms()->Start());
   return hms_client_->Start();
 }
 
-Status HmsITestBase::CreateDatabase(const string& database_name) {
+Status HmsITestHarness::CreateDatabase(const string& database_name) {
   hive::Database db;
   db.name = database_name;
   RETURN_NOT_OK(hms_client_->CreateDatabase(db));
@@ -73,9 +75,10 @@ Status HmsITestBase::CreateDatabase(const string& database_name) {
   return Status::OK();
 }
 
-Status HmsITestBase::CreateKuduTable(const string& database_name,
-                                     const string& table_name,
-                                     MonoDelta timeout) {
+Status HmsITestHarness::CreateKuduTable(const string& database_name,
+                                        const string& table_name,
+                                        const shared_ptr<client::KuduClient>& client,
+                                        MonoDelta timeout) {
   // Get coverage of all column types.
   KuduSchemaBuilder b;
   b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()
@@ -99,7 +102,7 @@ Status HmsITestBase::CreateKuduTable(const string& database_name,
                                ->Precision(kMaxDecimal128Precision);
   KuduSchema schema;
   RETURN_NOT_OK(b.Build(&schema));
-  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
   if (timeout.Initialized()) {
     // If specified, set the timeout for the operation.
     table_creator->timeout(timeout);
@@ -112,10 +115,10 @@ Status HmsITestBase::CreateKuduTable(const string& database_name,
       .Create();
 }
 
-Status HmsITestBase::CreateHmsTable(const string& database_name,
-                                    const string& table_name,
-                                    const string& table_type,
-                                    boost::optional<const string&> kudu_table_name) {
+Status HmsITestHarness::CreateHmsTable(const string& database_name,
+                                       const string& table_name,
+                                       const string& table_type,
+                                       const boost::optional<const string&>& kudu_table_name) {
   hive::Table hms_table;
   hms_table.dbName = database_name;
   hms_table.tableName = table_name;
@@ -133,9 +136,9 @@ Status HmsITestBase::CreateHmsTable(const string& database_name,
   return hms_client_->CreateTable(hms_table);
 }
 
-Status HmsITestBase::RenameHmsTable(const string& database_name,
-                                    const string& old_table_name,
-                                    const string& new_table_name) {
+Status HmsITestHarness::RenameHmsTable(const string& database_name,
+                                       const string& old_table_name,
+                                       const string& new_table_name) {
   // The HMS doesn't have a rename table API. Instead it offers the more
   // general AlterTable API, which requires the entire set of table fields to be
   // set. Since we don't know these fields during a simple rename operation, we
@@ -148,8 +151,8 @@ Status HmsITestBase::RenameHmsTable(const string& database_name,
   return hms_client_->AlterTable(database_name, old_table_name, table);
 }
 
-Status HmsITestBase::AlterHmsTableDropColumns(const string& database_name,
-                                              const string& table_name) {
+Status HmsITestHarness::AlterHmsTableDropColumns(const string& database_name,
+                                                 const string& table_name) {
     hive::Table table;
     RETURN_NOT_OK(hms_client_->GetTable(database_name, table_name, &table));
     table.sd.cols.clear();
@@ -162,8 +165,8 @@ Status HmsITestBase::AlterHmsTableDropColumns(const string& database_name,
     return Status::OK();
 }
 
-Status HmsITestBase::AlterHmsTableExternalPurge(const string& database_name,
-                                                const string& table_name) {
+Status HmsITestHarness::AlterHmsTableExternalPurge(const string& database_name,
+                                                   const string& table_name) {
   hive::Table table;
   RETURN_NOT_OK(hms_client_->GetTable(database_name, table_name, &table));
   table.tableType = HmsClient::kExternalTable;
@@ -178,13 +181,15 @@ Status HmsITestBase::AlterHmsTableExternalPurge(const string& database_name,
   return Status::OK();
 }
 
-void HmsITestBase::CheckTable(const string& database_name,
-                              const string& table_name,
-                              boost::optional<const string&> user,
-                              const string& table_type) {
+void HmsITestHarness::CheckTable(const string& database_name,
+                                 const string& table_name,
+                                 const boost::optional<const string&>& user,
+                                 const unique_ptr<ExternalMiniCluster>& cluster,
+                                 const shared_ptr<KuduClient>& client,
+                                 const string& table_type) {
   SCOPED_TRACE(Substitute("Checking table $0.$1", database_name, table_name));
   shared_ptr<KuduTable> table;
-  ASSERT_OK(client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table));
+  ASSERT_OK(client->OpenTable(Substitute("$0.$1", database_name, table_name), &table));
 
   hive::Table hms_table;
   ASSERT_OK(hms_client_->GetTable(database_name, table_name, &hms_table));
@@ -208,18 +213,19 @@ void HmsITestBase::CheckTable(const string& database_name,
   ASSERT_EQ(table->id(), hms_table.parameters[hms::HmsClient::kKuduTableIdKey]);
   ASSERT_TRUE(boost::iequals(table->name(),
       hms_table.parameters[hms::HmsClient::kKuduTableNameKey]));
-  ASSERT_EQ(HostPort::ToCommaSeparatedString(cluster_->master_rpc_addrs()),
+  ASSERT_EQ(HostPort::ToCommaSeparatedString(cluster->master_rpc_addrs()),
             hms_table.parameters[hms::HmsClient::kKuduMasterAddrsKey]);
   ASSERT_EQ(hms::HmsClient::kKuduStorageHandler,
             hms_table.parameters[hms::HmsClient::kStorageHandlerKey]);
 }
 
-void HmsITestBase::CheckTableDoesNotExist(const string& database_name,
-                                          const string& table_name) {
+void HmsITestHarness::CheckTableDoesNotExist(const string& database_name,
+                                             const string& table_name,
+                                             const shared_ptr<KuduClient>& client) {
   SCOPED_TRACE(Substitute("Checking table $0.$1 does not exist", database_name, table_name));
 
   shared_ptr<KuduTable> table;
-  Status s = client_->OpenTable(Substitute("$0.$1", database_name, table_name), &table);
+  Status s = client->OpenTable(Substitute("$0.$1", database_name, table_name), &table);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
   hive::Table hms_table;
diff --git a/src/kudu/integration-tests/hms_itest-base.h b/src/kudu/integration-tests/hms_itest-base.h
index 58e5cd5..91d8dbe 100644
--- a/src/kudu/integration-tests/hms_itest-base.h
+++ b/src/kudu/integration-tests/hms_itest-base.h
@@ -24,25 +24,36 @@
 
 #include "kudu/gutil/port.h"
 #include "kudu/hms/hms_client.h"
-#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/hms/mini_hms.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
 class MonoDelta;
+namespace thrift {
+struct ClientOptions;
+} // namespace thrift
 
-class HmsITestBase : public ExternalMiniClusterITestBase {
+namespace client {
+class KuduClient;
+} // namespace client
+
+class HmsITestHarness {
  public:
-  Status StartHms() WARN_UNUSED_RESULT;
-  Status StopHms() WARN_UNUSED_RESULT;
+  Status StartHms(const std::unique_ptr<cluster::ExternalMiniCluster>& cluster)
+    WARN_UNUSED_RESULT;
+  Status StopHms(const std::unique_ptr<cluster::ExternalMiniCluster>& cluster)
+    WARN_UNUSED_RESULT;
 
   // Creates a database in the HMS catalog.
   Status CreateDatabase(const std::string& database_name) WARN_UNUSED_RESULT;
 
   // Creates a table in Kudu.
-  Status CreateKuduTable(const std::string& database_name,
-                         const std::string& table_name,
-                         MonoDelta timeout = {}) WARN_UNUSED_RESULT;
+  static Status CreateKuduTable(const std::string& database_name,
+                                const std::string& table_name,
+                                const client::sp::shared_ptr<client::KuduClient>& client,
+                                MonoDelta timeout = {}) WARN_UNUSED_RESULT;
 
   // Creates a table in the HMS catalog.
   // If supplied, 'kudu_table_name' will be used for the 'kudu.table_name'
@@ -50,7 +61,7 @@ class HmsITestBase : public ExternalMiniClusterITestBase {
   Status CreateHmsTable(const std::string& database_name,
                         const std::string& table_name,
                         const std::string& table_type = hms::HmsClient::kManagedTable,
-                        boost::optional<const std::string&> kudu_table_name = boost::none);
+                        const boost::optional<const std::string&>& kudu_table_name = boost::none);
 
   // Renames a table entry in the HMS catalog.
   Status RenameHmsTable(const std::string& database_name,
@@ -71,12 +82,25 @@ class HmsITestBase : public ExternalMiniClusterITestBase {
   // checks against the logged in user).
   void CheckTable(const std::string& database_name,
                   const std::string& table_name,
-                  boost::optional<const std::string&> user,
+                  const boost::optional<const std::string&>& user,
+                  const std::unique_ptr<cluster::ExternalMiniCluster>& cluster,
+                  const client::sp::shared_ptr<client::KuduClient>& client,
                   const std::string& table_type = hms::HmsClient::kManagedTable);
 
   // Checks that a table does not exist in the Kudu and HMS catalogs.
   void CheckTableDoesNotExist(const std::string& database_name,
-                              const std::string& table_name);
+                              const std::string& table_name,
+                              const client::sp::shared_ptr<client::KuduClient>& client);
+
+  hms::HmsClient* hms_client() {
+    return hms_client_.get();
+  }
+
+  Status RestartHmsClient(const std::unique_ptr<cluster::ExternalMiniCluster>& cluster,
+                          const thrift::ClientOptions& hms_opts) {
+    hms_client_.reset(new hms::HmsClient(cluster->hms()->address(), hms_opts));
+    return hms_client_->Start();
+  }
 
  protected:
   std::unique_ptr<hms::HmsClient> hms_client_;
diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_authz-itest.cc
similarity index 56%
rename from src/kudu/integration-tests/master_sentry-itest.cc
rename to src/kudu/integration-tests/master_authz-itest.cc
index b184af7..a67b1d2 100644
--- a/src/kudu/integration-tests/master_sentry-itest.cc
+++ b/src/kudu/integration-tests/master_authz-itest.cc
@@ -40,7 +40,6 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/hms_client.h"
-#include "kudu/hms/mini_hms.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
 #include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/hms_itest-base.h"
@@ -58,6 +57,7 @@
 #include "kudu/sentry/sentry_client.h"
 #include "kudu/sentry/sentry_policy_service_types.h"
 #include "kudu/thrift/client.h"
+#include "kudu/util/decimal_util.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/slice.h"
@@ -78,6 +78,8 @@ using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
 using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::hms::HmsClient;
 using kudu::master::AlterRoleGrantPrivilege;
 using kudu::master::CreateRoleAndAddToGroups;
@@ -136,6 +138,23 @@ struct OperationParams {
   const string new_table_name;
 };
 
+enum HarnessEnum {
+  kSentry,
+  kSentryWithCache,
+  kRanger,
+};
+string HarnessEnumToString(HarnessEnum h) {
+  switch (h) {
+    case kSentry:
+      return "SentryNoCache";
+    case kSentryWithCache:
+      return "SentryWithCache";
+    case kRanger:
+      return "Ranger";
+  }
+  return "";
+}
+
 // Parameters for the privilege functor (see below for PrivilegeFunc).
 struct PrivilegeParams {
   // NOLINTNEXTLINE(google-explicit-constructor)
@@ -148,27 +167,15 @@ struct PrivilegeParams {
   const string table_name;
 };
 
-// Test Master authorization enforcement with Sentry and HMS
-// integration enabled.
-class SentryITestBase : public HmsITestBase {
+class MasterAuthzITestHarness {
  public:
-  Status StopSentry() {
-    RETURN_NOT_OK(sentry_client_->Stop());
-    RETURN_NOT_OK(cluster_->sentry()->Stop());
-    return Status::OK();
-  }
+  virtual ~MasterAuthzITestHarness() {}
 
-  Status StartSentry() {
-    RETURN_NOT_OK(cluster_->sentry()->Start());
-    RETURN_NOT_OK(cluster_->kdc()->Kinit("kudu"));
-    RETURN_NOT_OK(sentry_client_->Start());
-    return Status::OK();
-  }
-
-  Status GetTableLocationsWithTableId(const string& table_name,
-                                      optional<const string&> table_id) {
+  static Status GetTableLocationsWithTableId(const string& table_name,
+                                             const optional<const string&>& table_id,
+                                             const unique_ptr<ExternalMiniCluster>& cluster) {
     const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
-    std::shared_ptr<MasterServiceProxy> proxy = cluster_->master_proxy();
+    std::shared_ptr<MasterServiceProxy> proxy = cluster->master_proxy();
     UserCredentials user_credentials;
     user_credentials.set_real_user(kTestUser);
     proxy->set_user_credentials(user_credentials);
@@ -178,81 +185,54 @@ class SentryITestBase : public HmsITestBase {
                                     table_id, &table_locations);
   }
 
-  Status GrantCreateTablePrivilege(const PrivilegeParams& p) {
-    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
-        GetDatabasePrivilege(p.db_name, "CREATE"));
-  }
-
-  Status GrantDropTablePrivilege(const PrivilegeParams& p) {
-    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
-        GetTablePrivilege(p.db_name, p.table_name, "DROP"));
-  }
-
-  Status GrantAlterTablePrivilege(const PrivilegeParams& p) {
-    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
-        GetTablePrivilege(p.db_name, p.table_name, "ALTER"));
-  }
-
-  Status GrantRenameTablePrivilege(const PrivilegeParams& p) {
-    RETURN_NOT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
-        GetTablePrivilege(p.db_name, p.table_name, "ALL")));
-    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
-        GetDatabasePrivilege(p.db_name, "CREATE"));
-  }
-
-  Status GrantGetMetadataTablePrivilege(const PrivilegeParams& p) {
-    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
-        GetTablePrivilege(p.db_name, p.table_name, "METADATA"));
-  }
-
-  Status CreateTable(const OperationParams& p) {
-    Slice hms_database;
-    Slice hms_table;
-    RETURN_NOT_OK(ParseHiveTableIdentifier(p.table_name,
-                                           &hms_database, &hms_table));
-    return CreateKuduTable(hms_database.ToString(), hms_table.ToString());
-  }
-
-  Status IsCreateTableDone(const OperationParams& p) {
+  static Status IsCreateTableDone(const OperationParams& p,
+                                  const shared_ptr<KuduClient>& client) {
     bool in_progress = false;
-    return client_->IsCreateTableInProgress(p.table_name, &in_progress);
+    return client->IsCreateTableInProgress(p.table_name, &in_progress);
   }
 
-  Status DropTable(const OperationParams& p) {
-    return client_->DeleteTable(p.table_name);
+  static Status DropTable(const OperationParams& p,
+                          const shared_ptr<KuduClient>& client) {
+    return client->DeleteTable(p.table_name);
   }
 
-  Status AlterTable(const OperationParams& p) {
+  static Status AlterTable(const OperationParams& p,
+                           const shared_ptr<KuduClient>& client) {
     unique_ptr<KuduTableAlterer> alterer(
-        client_->NewTableAlterer(p.table_name));
+        client->NewTableAlterer(p.table_name));
     return alterer->DropColumn("int32_val")->Alter();
   }
 
-  Status IsAlterTableDone(const OperationParams& p) {
+  static Status IsAlterTableDone(const OperationParams& p,
+                                 const shared_ptr<KuduClient>& client) {
     bool in_progress = false;
-    return client_->IsAlterTableInProgress(p.table_name, &in_progress);
+    return client->IsAlterTableInProgress(p.table_name, &in_progress);
   }
 
-  Status RenameTable(const OperationParams& p) {
+  static Status RenameTable(const OperationParams& p,
+                            const shared_ptr<KuduClient>& client) {
     unique_ptr<KuduTableAlterer> alterer(
-        client_->NewTableAlterer(p.table_name));
+        client->NewTableAlterer(p.table_name));
     return alterer->RenameTo(p.new_table_name)->Alter();
   }
 
-  Status GetTableSchema(const OperationParams& p) {
+  static Status GetTableSchema(const OperationParams& p,
+                               const shared_ptr<KuduClient>& client) {
     KuduSchema schema;
-    return client_->GetTableSchema(p.table_name, &schema);
+    return client->GetTableSchema(p.table_name, &schema);
   }
 
-  Status GetTableLocations(const OperationParams& p) {
-    return GetTableLocationsWithTableId(p.table_name, /*table_id=*/none);
+  static Status GetTableLocations(const OperationParams& p,
+                                  const unique_ptr<ExternalMiniCluster>& cluster) {
+    return GetTableLocationsWithTableId(p.table_name, /*table_id=*/none, cluster);
   }
 
-  Status GetTabletLocations(const OperationParams& p) {
+  static Status GetTabletLocations(const OperationParams& p,
+                                   const unique_ptr<ExternalMiniCluster>& cluster) {
     // Log in as 'test-admin' to get the tablet ID.
-    RETURN_NOT_OK(cluster_->kdc()->Kinit(kAdminUser));
+    RETURN_NOT_OK(cluster->kdc()->Kinit(kAdminUser));
     shared_ptr<KuduClient> client;
-    RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
+    RETURN_NOT_OK(cluster->CreateClient(nullptr, &client));
 
     shared_ptr<KuduTable> kudu_table;
     RETURN_NOT_OK(client->OpenTable(p.table_name, &kudu_table));
@@ -264,10 +244,10 @@ class SentryITestBase : public HmsITestBase {
     const string& tablet_id = tokens[0]->tablet().id();
 
     // Log back as 'test-user'.
-    RETURN_NOT_OK(cluster_->kdc()->Kinit(kTestUser));
+    RETURN_NOT_OK(cluster->kdc()->Kinit(kTestUser));
 
     static const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
-    std::shared_ptr<MasterServiceProxy> proxy(cluster_->master_proxy());
+    std::shared_ptr<MasterServiceProxy> proxy(cluster->master_proxy());
     UserCredentials user_credentials;
     user_credentials.set_real_user(kTestUser);
     proxy->set_user_credentials(user_credentials);
@@ -277,79 +257,188 @@ class SentryITestBase : public HmsITestBase {
                                      VOTER_REPLICA, &tablet_locations);
   }
 
-  void SetUp() override {
-    HmsITestBase::SetUp();
-
-    cluster::ExternalMiniClusterOptions opts;
-    // Always enable Kerberos, as sentry deployment does not make much sense
-    // in a non-Kerberized environment.
-    bool enable_kerberos = true;
-    opts.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
-    opts.enable_sentry = true;
-    opts.enable_kerberos = enable_kerberos;
-    // Configure the timeout to reduce the run time of tests that involve
-    // re-connections.
-    const string timeout = AllowSlowTests() ? "5" : "2";
-    opts.extra_master_flags.emplace_back(
-       Substitute("$0=$1", "--sentry_service_send_timeout_seconds", timeout));
-    opts.extra_master_flags.emplace_back(
-       Substitute("$0=$1", "--sentry_service_recv_timeout_seconds", timeout));
+  static Status CreateKuduTable(const string& database_name,
+                                const string& table_name,
+                                const shared_ptr<client::KuduClient>& client,
+                                MonoDelta timeout = {}) {
+    // Get coverage of all column types.
+    KuduSchemaBuilder b;
+    b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()
+                      ->PrimaryKey()->Comment("The Primary Key");
+    b.AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
+    b.AddColumn("int16_val")->Type(KuduColumnSchema::INT16);
+    b.AddColumn("int32_val")->Type(KuduColumnSchema::INT32);
+    b.AddColumn("int64_val")->Type(KuduColumnSchema::INT64);
+    b.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS);
+    b.AddColumn("date_val")->Type(KuduColumnSchema::DATE);
+    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
+    b.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL);
+    b.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
+    b.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE);
+    b.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY);
+    b.AddColumn("decimal32_val")->Type(KuduColumnSchema::DECIMAL)
+                                ->Precision(kMaxDecimal32Precision);
+    b.AddColumn("decimal64_val")->Type(KuduColumnSchema::DECIMAL)
+                                ->Precision(kMaxDecimal64Precision);
+    b.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL)
+                                 ->Precision(kMaxDecimal128Precision);
+    KuduSchema schema;
+    RETURN_NOT_OK(b.Build(&schema));
+    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+    if (timeout.Initialized()) {
+      // If specified, set the timeout for the operation.
+      table_creator->timeout(timeout);
+    }
+    return table_creator->table_name(Substitute("$0.$1",
+                                                database_name, table_name))
+        .schema(&schema)
+        .num_replicas(1)
+        .set_range_partition_columns({ "key" })
+        .Create();
+  }
+
+  static void CheckTable(const string& database_name,
+                         const string& table_name,
+                         const boost::optional<const string&>& /*user*/,
+                         const unique_ptr<ExternalMiniCluster>& /*cluster*/,
+                         const shared_ptr<KuduClient>& client,
+                         const std::string& /*table_type*/ = hms::HmsClient::kManagedTable) {
+    SCOPED_TRACE(Substitute("Checking table $0.$1", database_name, table_name));
+    shared_ptr<KuduTable> table;
+    ASSERT_OK(client->OpenTable(Substitute("$0.$1", database_name, table_name), &table));
+  }
 
+  // Creates db.table and db.second_table.
+  virtual Status SetUpTables(const unique_ptr<ExternalMiniCluster>& cluster,
+                             const shared_ptr<KuduClient>& client) {
+    RETURN_NOT_OK(CreateKuduTable(kDatabaseName, kTableName, client));
+    RETURN_NOT_OK(CreateKuduTable(kDatabaseName, kSecondTable, client));
+    CheckTable(kDatabaseName, kTableName,
+               make_optional<const string&>(kAdminUser), cluster, client);
+    CheckTable(kDatabaseName, kSecondTable,
+               make_optional<const string&>(kAdminUser), cluster, client);
+    return Status::OK();
+  }
+
+  // Returns a set of opts appropriate for an authorization test.
+  ExternalMiniClusterOptions GetClusterOpts() {
+    ExternalMiniClusterOptions opts;
+    // Always enable Kerberos, as Sentry/Ranger deployments do not make sense
+    // in non-Kerberized environments.
+    opts.enable_kerberos = true;
     // Add 'impala' as trusted user who may access the cluster without being
     // authorized.
     opts.extra_master_flags.emplace_back("--trusted_user_acl=impala");
     opts.extra_master_flags.emplace_back("--user_acl=test-user,impala");
+    SetUpExternalMiniServiceOpts(&opts);
+    return opts;
+  }
 
-    // Enable/disable caching of authz privileges received from Sentry.
-    opts.extra_master_flags.emplace_back(
-        Substitute("--sentry_privileges_cache_capacity_mb=$0",
-                   IsAuthzPrivilegeCacheEnabled() ? 1 : 0));
+  virtual Status GrantCreateTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantDropTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantAlterTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantRenameTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantGetMetadataTablePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) = 0;
+  virtual Status CreateTable(const OperationParams& p,
+                             const shared_ptr<KuduClient>& client) = 0;
+  virtual void CheckTableDoesNotExist(const string& database_name, const string& table_name,
+                                      shared_ptr<KuduClient> client) = 0;
+
+  // Start or stop the authorization services.
+  virtual Status StartAuthzProvider(const unique_ptr<ExternalMiniCluster>& cluster) = 0;
+  virtual Status StopAuthzProvider(const unique_ptr<ExternalMiniCluster>& cluster) = 0;
+
+  // Stop authorization service clients.
+  virtual void TearDown() = 0;
+
+  // Adds to 'opts' any options specific to the authorization service.
+  virtual void SetUpExternalMiniServiceOpts(ExternalMiniClusterOptions* opts) = 0;
+
+  // Sets up credentials such that kAdminUser has access to everything in
+  // kDatabaseName.
+  virtual Status SetUpCredentials() = 0;
+
+  // Sets things up so we can begin sending requests to the authorization
+  // services (whether that setup is an actual SentryClient or the ability to
+  // send curl requests to MiniRanger).
+  virtual Status SetUpExternalServiceClients(const unique_ptr<ExternalMiniCluster>& cluster) = 0;
+};
 
-    StartClusterWithOpts(std::move(opts));
+// Test Master authorization enforcement with Sentry and HMS
+// integration enabled.
+class SentryITestHarness : public MasterAuthzITestHarness,
+                           public HmsITestHarness {
+ public:
+  using MasterAuthzITestHarness::CreateKuduTable;
+  using HmsITestHarness::CheckTable;
+  Status StopAuthzProvider(const unique_ptr<ExternalMiniCluster>& cluster) override {
+    RETURN_NOT_OK(sentry_client_->Stop());
+    RETURN_NOT_OK(cluster->sentry()->Stop());
+    return Status::OK();
+  }
 
-    // Create principals 'impala' and 'kudu'. Configure to use the latter.
-    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(kImpalaUser));
-    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu"));
-    ASSERT_OK(cluster_->kdc()->Kinit("kudu"));
+  Status StartAuthzProvider(const unique_ptr<ExternalMiniCluster>& cluster) override {
+    RETURN_NOT_OK(cluster->sentry()->Start());
+    RETURN_NOT_OK(cluster->kdc()->Kinit("kudu"));
+    RETURN_NOT_OK(sentry_client_->Start());
+    return Status::OK();
+  }
 
-    thrift::ClientOptions hms_opts;
-    hms_opts.enable_kerberos = enable_kerberos;
-    hms_opts.service_principal = "hive";
-    hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
-    ASSERT_OK(hms_client_->Start());
+  void CheckTableDoesNotExist(const string& database_name, const string& table_name,
+                              shared_ptr<KuduClient> client) override {
+    return HmsITestHarness::CheckTableDoesNotExist(database_name, table_name, client);
+  }
 
-    thrift::ClientOptions sentry_opts;
-    sentry_opts.enable_kerberos = enable_kerberos;
-    sentry_opts.service_principal = "sentry";
-    sentry_client_.reset(new SentryClient(cluster_->sentry()->address(), sentry_opts));
-    ASSERT_OK(sentry_client_->Start());
+  Status GrantCreateTablePrivilege(const PrivilegeParams& p) override {
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetDatabasePrivilege(p.db_name, "CREATE"));
+  }
 
-    // User to Role mapping:
-    // 1. user -> developer,
-    // 2. admin -> admin.
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kDevRole, kUserGroup));
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kAdminRole, kAdminGroup));
+  Status GrantDropTablePrivilege(const PrivilegeParams& p) override {
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetTablePrivilege(p.db_name, p.table_name, "DROP"));
+  }
 
-    // Grant privilege 'ALL' on database 'db' to role admin.
-    TSentryPrivilege privilege = GetDatabasePrivilege(
-        kDatabaseName, "ALL",
-        TSentryGrantOption::DISABLED);
-    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(),
-                                              kAdminRole, privilege));
-
-    // Create database 'db' in HMS and Kudu tables 'table' and 'second_table' which
-    // owns by user 'test-admin'.
-    ASSERT_OK(CreateDatabase(kDatabaseName));
-    ASSERT_OK(CreateKuduTable(kDatabaseName, kTableName));
-    ASSERT_OK(CreateKuduTable(kDatabaseName, kSecondTable));
-    NO_FATALS(CheckTable(kDatabaseName, kTableName,
-                         make_optional<const string&>(kAdminUser)));
-    NO_FATALS(CheckTable(kDatabaseName, kSecondTable,
-                         make_optional<const string&>(kAdminUser)));
+  Status GrantAlterTablePrivilege(const PrivilegeParams& p) override {
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetTablePrivilege(p.db_name, p.table_name, "ALTER"));
+  }
 
-    // Log in as 'test-user' and reset the client to pick up the change in user.
-    ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
-    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  Status GrantRenameTablePrivilege(const PrivilegeParams& p) override {
+    RETURN_NOT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetTablePrivilege(p.db_name, p.table_name, "ALL")));
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetDatabasePrivilege(p.db_name, "CREATE"));
+  }
+
+  Status GrantGetMetadataTablePrivilege(const PrivilegeParams& p) override {
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetTablePrivilege(p.db_name, p.table_name, "METADATA"));
+  }
+
+  Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) override {
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kDevRole,
+        GetDatabasePrivilege(p.db_name, "METADATA"));
+  }
+
+  Status CreateTable(const OperationParams& p,
+                     const shared_ptr<KuduClient>& client) override {
+    Slice hms_database;
+    Slice hms_table;
+    RETURN_NOT_OK(ParseHiveTableIdentifier(p.table_name,
+                                           &hms_database, &hms_table));
+    return MasterAuthzITestHarness::CreateKuduTable(hms_database.ToString(),
+                                                    hms_table.ToString(), client);
+  }
+
+  Status SetUpTables(const unique_ptr<ExternalMiniCluster>& cluster,
+                     const shared_ptr<KuduClient>& client) override {
+    // First create database 'db' in the HMS.
+    RETURN_NOT_OK(CreateDatabase(kDatabaseName));
+    // Then create Kudu tables 'table' and 'second_table', owned by user
+    // 'test-admin'.
+    return MasterAuthzITestHarness::SetUpTables(cluster, client);
   }
 
   void TearDown() override {
@@ -359,82 +448,341 @@ class SentryITestBase : public HmsITestBase {
     if (hms_client_) {
       ASSERT_OK(hms_client_->Stop());
     }
-    HmsITestBase::TearDown();
   }
 
-  virtual bool IsAuthzPrivilegeCacheEnabled() const {
-    return false;
+  void SetUpExternalMiniServiceOpts(ExternalMiniClusterOptions* opts) override {
+    opts->enable_sentry = true;
+    // Configure the timeout to reduce the run time of tests that involve
+    // re-connections.
+    const string timeout = AllowSlowTests() ? "5" : "2";
+    opts->hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION;
+    opts->extra_master_flags.emplace_back(
+       Substitute("$0=$1", "--sentry_service_send_timeout_seconds", timeout));
+    opts->extra_master_flags.emplace_back(
+       Substitute("$0=$1", "--sentry_service_recv_timeout_seconds", timeout));
+    // NOTE: this can be overwritten if another value is added to the end.
+    opts->extra_master_flags.emplace_back("--sentry_privileges_cache_capacity_mb=0");
+  }
+
+  Status SetUpExternalServiceClients(const unique_ptr<ExternalMiniCluster>& cluster) override {
+    thrift::ClientOptions hms_opts;
+    hms_opts.enable_kerberos = true;
+    hms_opts.service_principal = "hive";
+    RETURN_NOT_OK(HmsITestHarness::RestartHmsClient(cluster, hms_opts));
+
+    thrift::ClientOptions sentry_opts;
+    sentry_opts.enable_kerberos = true;
+    sentry_opts.service_principal = "sentry";
+    sentry_client_.reset(new SentryClient(cluster->sentry()->address(), sentry_opts));
+    return sentry_client_->Start();
+  }
+
+  Status SetUpCredentials() override {
+    // User to Role mapping:
+    // 1. user -> developer,
+    // 2. admin -> admin.
+    RETURN_NOT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kDevRole, kUserGroup));
+    RETURN_NOT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kAdminRole, kAdminGroup));
+
+    // Grant privilege 'ALL' on database 'db' to role admin.
+    TSentryPrivilege privilege = GetDatabasePrivilege(
+        kDatabaseName, "ALL",
+        TSentryGrantOption::DISABLED);
+    return AlterRoleGrantPrivilege(sentry_client_.get(), kAdminRole, privilege);
   }
 
  protected:
   unique_ptr<SentryClient> sentry_client_;
 };
 
-class RangerITestBase : public ExternalMiniClusterITestBase {
+class SentryWithCacheITestHarness : public SentryITestHarness {
  public:
-  void SetUp() override {
-    ExternalMiniClusterITestBase::SetUp();
+  void SetUpExternalMiniServiceOpts(ExternalMiniClusterOptions* opts) override {
+    NO_FATALS(SentryITestHarness::SetUpExternalMiniServiceOpts(opts));
+    opts->extra_master_flags.emplace_back("--sentry_privileges_cache_capacity_mb=1");
+  }
+};
 
-    cluster::ExternalMiniClusterOptions opts;
-    opts.enable_ranger = true;
-    opts.enable_kerberos = true;
+class RangerITestHarness : public MasterAuthzITestHarness {
+ public:
+  static constexpr int kSleepAfterNewPolicyMs = 1200;
+
+  Status GrantCreateTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    // IsCreateTableDone() requires METADATA on the table level.
+    policy.tables.emplace_back("*");
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::CREATE}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+
+    return Status::OK();
+  }
+
+  Status GrantDropTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.tables.emplace_back(p.table_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::DROP}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+
+    return Status::OK();
+  }
+
+  void CheckTableDoesNotExist(const string& database_name, const string& table_name,
+                              shared_ptr<KuduClient> client) override {
+    shared_ptr<KuduTable> table;
+    Status s = client->OpenTable(Substitute("$0.$1", database_name, table_name), &table);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  }
+
+  Status GrantAlterTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.tables.emplace_back(p.table_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALTER}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
 
-    StartClusterWithOpts(std::move(opts));
-    ranger_ = this->cluster_->ranger();
+  Status GrantRenameTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy_new_table;
+    policy_new_table.databases.emplace_back(p.db_name);
+    // IsCreateTableDone() requires METADATA on the table level.
+    policy_new_table.tables.emplace_back("*");
+    policy_new_table.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::CREATE}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy_new_table)));
 
     AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.tables.emplace_back(p.table_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::ALL}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
+  Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
+  Status GrantGetMetadataTablePrivilege(const PrivilegeParams& p) override {
+    AuthorizationPolicy policy;
+    policy.databases.emplace_back(p.db_name);
+    policy.tables.emplace_back(p.table_name);
+    policy.items.emplace_back(PolicyItem({kTestUser}, {ActionPB::METADATA}));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
+  }
+
+  Status CreateTable(const OperationParams& p,
+                     const shared_ptr<KuduClient>& client) override {
+    string ranger_db;
+    Slice ranger_table;
+    RETURN_NOT_OK(ParseRangerTableIdentifier(p.table_name,
+                                             &ranger_db, &ranger_table));
+    return CreateKuduTable(ranger_db, ranger_table.ToString(), client);
+  }
+
+  Status StartAuthzProvider(const unique_ptr<ExternalMiniCluster>& cluster) override {
+    return cluster->ranger()->Start();
+  }
+
+  Status StopAuthzProvider(const unique_ptr<ExternalMiniCluster>& cluster) override {
+    return cluster->ranger()->Stop();
+  }
+
+  void TearDown() override {}
+
+ protected:
+  void SetUpExternalMiniServiceOpts(ExternalMiniClusterOptions* opts) override {
+    opts->enable_ranger = true;
+  }
+
+  Status SetUpExternalServiceClients(const unique_ptr<ExternalMiniCluster>& cluster) override {
+    ranger_ = cluster->ranger();
+    return Status::OK();
+  }
+
+  Status SetUpCredentials() override {
+    AuthorizationPolicy policy;
     policy.databases.emplace_back(kDatabaseName);
     policy.tables.emplace_back("*");
     policy.items.emplace_back(PolicyItem({kAdminUser}, {ActionPB::ALL}));
-    ASSERT_OK(ranger_->AddPolicy(move(policy)));
-    SleepFor(MonoDelta::FromMilliseconds(1500));
+    RETURN_NOT_OK(ranger_->AddPolicy(move(policy)));
+    SleepFor(MonoDelta::FromMilliseconds(kSleepAfterNewPolicyMs));
+    return Status::OK();
   }
+
  private:
+  // Points to the Ranger instance used by the harness. This can be used to set
+  // policies.
   MiniRanger* ranger_;
 };
 
-// Functor that performs a certain operation (e.g. Create, Rename) on a table
-// given its name and its desired new name, if necessary (only used for Rename).
-typedef function<Status(SentryITestBase*, const OperationParams&)> OperatorFunc;
+// Test basic master authorization enforcement with Sentry and HMS integration
+// enabled.
+class MasterAuthzITestBase : public ExternalMiniClusterITestBase {
+ public:
+  void SetUp() override {
+    NO_FATALS(ExternalMiniClusterITestBase::SetUp());
+  }
 
-// Functor that grants the required permission for an operation (e.g Create,
-// Rename) on a table given the database the table belongs to and the name of
-// the table, if applicable.
-typedef function<Status(SentryITestBase*, const PrivilegeParams&)> PrivilegeFunc;
+  void SetUpCluster(HarnessEnum harness) {
+    switch (harness) {
+      case kSentry:
+        harness_.reset(new SentryITestHarness());
+        break;
+      case kSentryWithCache:
+        harness_.reset(new SentryWithCacheITestHarness());
+        break;
+      case kRanger:
+        harness_.reset(new RangerITestHarness());
+        break;
+      default:
+        LOG(FATAL) << "unknown harness";
+    }
+    ExternalMiniClusterOptions opts = harness_->GetClusterOpts();
+    NO_FATALS(StartClusterWithOpts(std::move(opts)));
 
-// A description of the operation function that describes a particular action
-// on a table a user can perform, as well as the privilege granting function
-// that grants the required permission to the user to perform the action.
-struct AuthzFuncs {
-  OperatorFunc do_action;
-  PrivilegeFunc grant_privileges;
-  string description;
-};
-ostream& operator <<(ostream& out, const AuthzFuncs& d) {
-  out << d.description;
-  return out;
-}
+    // Create principals 'impala' and 'kudu'. Configure to use the latter.
+    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(kImpalaUser));
+    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu"));
+    ASSERT_OK(cluster_->kdc()->Kinit("kudu"));
 
-// A description of an authorization process, including the protected resource (table),
-// the operation function, as well as the privilege granting function.
-struct AuthzDescriptor {
-  AuthzFuncs funcs;
-  string database;
-  string table_name;
-  string new_table_name;
+    ASSERT_OK(harness_->SetUpExternalServiceClients(cluster_));
+    ASSERT_OK(harness_->SetUpCredentials());
+    ASSERT_OK(harness_->SetUpTables(cluster_, client_));
+
+    // Log in as 'test-user' and reset the client to pick up the change in user.
+    ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
+    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  }
+
+  void TearDown() override {
+    harness_->TearDown();
+    ExternalMiniClusterITestBase::TearDown();
+  }
+
+  Status GetTableLocationsWithTableId(const string& table_name,
+                                      const optional<const string&>& table_id) {
+    return harness_->GetTableLocationsWithTableId(table_name, table_id, cluster_);
+  }
+
+  Status GrantCreateTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantCreateTablePrivilege(p);
+  }
+
+  Status GrantDropTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantDropTablePrivilege(p);
+  }
+
+  Status GrantAlterTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantAlterTablePrivilege(p);
+  }
+
+  Status GrantRenameTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantRenameTablePrivilege(p);
+  }
+
+  Status GrantGetMetadataTablePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantGetMetadataTablePrivilege(p);
+  }
+
+  Status GrantGetMetadataDatabasePrivilege(const PrivilegeParams& p) {
+    return harness_->GrantGetMetadataDatabasePrivilege(p);
+  }
+
+  Status CreateTable(const OperationParams& p) {
+    return harness_->CreateTable(p, client_);
+  }
+
+  Status IsCreateTableDone(const OperationParams& p) {
+    return harness_->IsCreateTableDone(p, client_);
+  }
+
+  Status DropTable(const OperationParams& p) {
+    return harness_->DropTable(p, client_);
+  }
+
+  Status AlterTable(const OperationParams& p) {
+    return harness_->AlterTable(p, client_);
+  }
+
+  Status IsAlterTableDone(const OperationParams& p) {
+    return harness_->IsAlterTableDone(p, client_);
+  }
+
+  Status RenameTable(const OperationParams& p) {
+    return harness_->RenameTable(p, client_);
+  }
+
+  Status GetTableSchema(const OperationParams& p) {
+    return harness_->GetTableSchema(p, client_);
+  }
+
+  Status GetTableLocations(const OperationParams& p) {
+    return harness_->GetTableLocations(p, cluster_);
+  }
+
+  Status GetTabletLocations(const OperationParams& p) {
+    return harness_->GetTabletLocations(p, cluster_);
+  }
+
+  Status CreateKuduTable(const std::string& database_name,
+                         const std::string& table_name,
+                         MonoDelta timeout = {}) {
+    return harness_->CreateKuduTable(database_name, table_name, client_, timeout);
+  }
+
+  void CheckTable(const std::string& database_name,
+                  const std::string& table_name,
+                  const boost::optional<const std::string&>& user,
+                  const std::string& table_type = hms::HmsClient::kManagedTable) {
+    harness_->CheckTable(database_name, table_name, user, cluster_, client_, table_type);
+  }
+
+  void CheckTableDoesNotExist(const std::string& database_name,
+                              const std::string& table_name) {
+    harness_->CheckTableDoesNotExist(database_name, table_name, client_);
+  }
+
+  Status StartAuthzProvider() {
+    return harness_->StartAuthzProvider(cluster_);
+  }
+
+  Status StopAuthzProvider() {
+    return harness_->StopAuthzProvider(cluster_);
+  }
+
+ protected:
+  std::unique_ptr<MasterAuthzITestHarness> harness_;
 };
-ostream& operator <<(ostream& out, const AuthzDescriptor& d) {
-  out << d.funcs.description;
-  return out;
-}
 
-// Test basic master authorization enforcement with Sentry and HMS integration
-// enabled.
-class MasterSentryTest : public SentryITestBase {};
+class MasterAuthzITest : public MasterAuthzITestBase,
+                         public ::testing::WithParamInterface<HarnessEnum> {
+ public:
+  void SetUp() override {
+    NO_FATALS(MasterAuthzITestBase::SetUp());
+    NO_FATALS(SetUpCluster(GetParam()));
+  }
+};
 
-class MasterRangerTest : public RangerITestBase {};
+INSTANTIATE_TEST_CASE_P(AuthzProviders, MasterAuthzITest,
+    ::testing::Values(kSentry, kRanger),
+    [] (const testing::TestParamInfo<MasterAuthzITest::ParamType>& info) {
+      return HarnessEnumToString(info.param);
+    });
 
-TEST_F(MasterRangerTest, TestCreateTableAuthorized) {
+TEST_P(MasterAuthzITest, TestCreateTableAuthorized) {
   ASSERT_OK(cluster_->kdc()->Kinit(kAdminUser));
   ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
 
@@ -445,14 +793,14 @@ TEST_F(MasterRangerTest, TestCreateTableAuthorized) {
   ASSERT_OK(b.Build(&schema));
   unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
   ASSERT_OK(table_creator->table_name(Substitute("$0.$1",
-                                      kDatabaseName, kTableName))
+                                      kDatabaseName, "another_table"))
     .schema(&schema)
     .num_replicas(1)
     .set_range_partition_columns({"key"})
     .Create());
 }
 
-TEST_F(MasterRangerTest, TestCreateTableUnauthorized) {
+TEST_P(MasterAuthzITest, TestCreateTableUnauthorized) {
   ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
   ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
 
@@ -463,7 +811,7 @@ TEST_F(MasterRangerTest, TestCreateTableUnauthorized) {
   ASSERT_OK(b.Build(&schema));
   unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
   ASSERT_TRUE(table_creator->table_name(Substitute("$0.$1",
-                                        kDatabaseName, kTableName))
+                                        kDatabaseName, "another_table"))
     .schema(&schema)
     .num_replicas(1)
     .set_range_partition_columns({"key"})
@@ -471,24 +819,24 @@ TEST_F(MasterRangerTest, TestCreateTableUnauthorized) {
 }
 
 // Test that the trusted user can access the cluster without being authorized.
-TEST_F(MasterSentryTest, TestTrustedUserAcl) {
+TEST_P(MasterAuthzITest, TestTrustedUserAcl) {
   // Log in as 'impala' and reset the client to pick up the change in user.
-  ASSERT_OK(cluster_->kdc()->Kinit(kImpalaUser));
-  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  ASSERT_OK(this->cluster_->kdc()->Kinit(kImpalaUser));
+  ASSERT_OK(this->cluster_->CreateClient(nullptr, &this->client_));
 
   vector<string> tables;
-  ASSERT_OK(client_->ListTables(&tables));
+  ASSERT_OK(this->client_->ListTables(&tables));
   unordered_set<string> tables_set(tables.begin(), tables.end());
   ASSERT_EQ(unordered_set<string>({ Substitute("$0.$1", kDatabaseName, kTableName),
                                     Substitute("$0.$1", kDatabaseName, kSecondTable) }),
             tables_set);
 
-  ASSERT_OK(CreateKuduTable(kDatabaseName, "new_table"));
-  NO_FATALS(CheckTable(kDatabaseName, "new_table",
-                       make_optional<const string&>(kImpalaUser)));
+  ASSERT_OK(this->CreateKuduTable(kDatabaseName, "new_table"));
+  NO_FATALS(this->CheckTable(kDatabaseName, "new_table",
+                             make_optional<const string&>(kImpalaUser)));
 }
 
-TEST_F(MasterSentryTest, TestAuthzListTables) {
+TEST_P(MasterAuthzITest, TestAuthzListTables) {
   // ListTables is not parameterized as other operations below, because non-authorized
   // tables will be filtered instead of returning NOT_AUTHORIZED error.
   const auto table_name = Substitute("$0.$1", kDatabaseName, kTableName);
@@ -496,40 +844,40 @@ TEST_F(MasterSentryTest, TestAuthzListTables) {
 
   // Listing tables shows nothing without proper privileges.
   vector<string> tables;
-  ASSERT_OK(client_->ListTables(&tables));
+  ASSERT_OK(this->client_->ListTables(&tables));
   ASSERT_TRUE(tables.empty());
 
   // Listing tables only shows the tables which user has proper privileges on.
   tables.clear();
-  ASSERT_OK(GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName }));
-  ASSERT_OK(client_->ListTables(&tables));
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName }));
+  ASSERT_OK(this->client_->ListTables(&tables));
   ASSERT_EQ(vector<string>({ table_name }), tables);
 
   tables.clear();
-  ASSERT_OK(GrantGetMetadataTablePrivilege({ kDatabaseName, kSecondTable }));
-  ASSERT_OK(client_->ListTables(&tables));
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kSecondTable }));
+  ASSERT_OK(this->client_->ListTables(&tables));
   unordered_set<string> tables_set(tables.begin(), tables.end());
   ASSERT_EQ(unordered_set<string>({ table_name, sec_table_name }), tables_set);
 }
 
 // When authorizing ListTables, if there is a concurrent rename, we may not end
 // up showing the table.
-TEST_F(MasterSentryTest, TestAuthzListTablesConcurrentRename) {
-  ASSERT_OK(cluster_->SetFlag(cluster_->master(),
+TEST_P(MasterAuthzITest, TestAuthzListTablesConcurrentRename) {
+  ASSERT_OK(this->cluster_->SetFlag(this->cluster_->master(),
       "catalog_manager_inject_latency_list_authz_ms", "3000"));;
   const auto table_name = Substitute("$0.$1", kDatabaseName, kTableName);
   const auto sec_table_name = Substitute("$0.$1", kDatabaseName, kSecondTable);
-  ASSERT_OK(GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName }));
-  ASSERT_OK(GrantRenameTablePrivilege({ kDatabaseName, kTableName }));
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName }));
+  ASSERT_OK(this->GrantRenameTablePrivilege({ kDatabaseName, kTableName }));
 
   // List the tables while injecting latency.
   vector<string> tables;
   thread t([&] {
-    ASSERT_OK(client_->ListTables(&tables));
+    ASSERT_OK(this->client_->ListTables(&tables));
   });
 
   // While that's happening, rename one of the tables.
-  ASSERT_OK(RenameTable({ table_name, Substitute("$0.$1", kDatabaseName, "b") }));
+  ASSERT_OK(this->RenameTable({ table_name, Substitute("$0.$1", kDatabaseName, "b") }));
   NO_FATALS(t.join());
 
   // We shouldn't see the renamed table.
@@ -537,54 +885,87 @@ TEST_F(MasterSentryTest, TestAuthzListTablesConcurrentRename) {
   ASSERT_EQ(sec_table_name, tables[0]);
 }
 
-TEST_F(MasterSentryTest, TestTableOwnership) {
-  ASSERT_OK(GrantCreateTablePrivilege({ kDatabaseName }));
-  ASSERT_OK(CreateKuduTable(kDatabaseName, "new_table"));
-  NO_FATALS(CheckTable(kDatabaseName, "new_table",
-                       make_optional<const string&>(kTestUser)));
+// Test that when the client passes a table identifier with the table name
+// and table ID refer to different tables, the client needs permission on
+// both tables for returning TABLE_NOT_FOUND error to avoid leaking table
+// existence.
+TEST_P(MasterAuthzITest, TestMismatchedTable) {
+  const auto table_name_a = Substitute("$0.$1", kDatabaseName, kTableName);
+  const auto table_name_b = Substitute("$0.$1", kDatabaseName, kSecondTable);
 
-  // Checks the user with table ownership automatically has ALL privilege on the
-  // table. User 'test-user' can delete the same table without specifically granting
-  // 'DROP ON TABLE'. Note that ownership population between the HMS and the Sentry
-  // service happens synchronously, therefore, the table deletion should succeed
-  // right after the table creation.
-  // TODO(hao): test create a table with a different owner than the client’s username?
-  ASSERT_OK(client_->DeleteTable(Substitute("$0.$1", kDatabaseName, "new_table")));
-  NO_FATALS(CheckTableDoesNotExist(kDatabaseName, "new_table"));
+  // Log in as 'test-admin' to get the tablet ID.
+  ASSERT_OK(this->cluster_->kdc()->Kinit(kAdminUser));
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(this->cluster_->CreateClient(nullptr, &client));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(table_name_a, &table));
+  optional<const string&> table_id_a = make_optional<const string&>(table->id());
+
+  // Log back as 'test-user'.
+  ASSERT_OK(this->cluster_->kdc()->Kinit(kTestUser));
+
+  Status s = this->GetTableLocationsWithTableId(table_name_b, table_id_a);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_MATCHES(s.ToString(), "[Uu]nauthorized action");
+
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName }));
+  s = this->GetTableLocationsWithTableId(table_name_b, table_id_a);
+  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_STR_MATCHES(s.ToString(), "[Uu]nauthorized action");
+
+  ASSERT_OK(this->GrantGetMetadataTablePrivilege({ kDatabaseName, kSecondTable }));
+  s = this->GetTableLocationsWithTableId(table_name_b, table_id_a);
+  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_STR_CONTAINS(s.ToString(), "the table ID refers to a different table");
 }
 
-// Checks Sentry privileges are synchronized upon table rename in the HMS.
-TEST_F(MasterSentryTest, TestRenameTablePrivilegeTransfer) {
-  ASSERT_OK(GrantRenameTablePrivilege({ kDatabaseName, kTableName }));
-  ASSERT_OK(RenameTable({ Substitute("$0.$1", kDatabaseName, kTableName),
-                          Substitute("$0.$1", kDatabaseName, "b") }));
-  NO_FATALS(CheckTable(kDatabaseName, "b",
-                       make_optional<const string&>(kAdminUser)));
+// Functor that performs a certain operation (e.g. Create, Rename) on a table
+// given its name and its desired new name, if necessary (only used for Rename).
+typedef function<Status(MasterAuthzITestBase*, const OperationParams&)> OperatorFunc;
 
-  unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(
-      Substitute("$0.$1", kDatabaseName, "b")));
-  alterer->DropColumn("int16_val");
+// Functor that grants the required permission for an operation (e.g Create,
+// Rename) on a table given the database the table belongs to and the name of
+// the table, if applicable.
+typedef function<Status(MasterAuthzITestBase*, const PrivilegeParams&)> PrivilegeFunc;
 
-  // Note that unlike table creation, there could be a delay between the table renaming
-  // in Kudu and the privilege renaming in Sentry. Because Kudu uses the transactional
-  // listener of the HMS to get notification of table alteration events, while Sentry
-  // uses post event listener (which is executed outside the HMS transaction). There
-  // is a chance that Kudu already finish the table renaming but the privilege renaming
-  // hasn't been reflected in the Sentry service.
-  ASSERT_EVENTUALLY([&] {
-    ASSERT_OK(alterer->Alter());
-  });
-  NO_FATALS(CheckTable(kDatabaseName, "b", make_optional<const string&>(kAdminUser)));
+// A description of the operation function that describes a particular action
+// on a table a user can perform, as well as the privilege granting function
+// that grants the required permission to the user to perform the action.
+struct AuthzFuncs {
+  OperatorFunc do_action;
+  PrivilegeFunc grant_privileges;
+  string description;
+};
+ostream& operator <<(ostream& out, const AuthzFuncs& d) {
+  out << d.description;
+  return out;
+}
+
+// A description of an authorization process, including the protected resource (table),
+// the operation function, as well as the privilege granting function.
+struct AuthzDescriptor {
+  AuthzFuncs funcs;
+  string database;
+  string table_name;
+  string new_table_name;
+};
+ostream& operator <<(ostream& out, const AuthzDescriptor& d) {
+  out << d.funcs.description;
+  return out;
 }
 
 class TestAuthzTable :
-    public MasterSentryTest,
-    public ::testing::WithParamInterface<AuthzDescriptor> {
-    // TODO(aserbin): update the test to introduce authz privilege caching
+    public MasterAuthzITestBase,
+    public ::testing::WithParamInterface<std::tuple<HarnessEnum, AuthzDescriptor>> {
+ public:
+  void SetUp() override {
+    NO_FATALS(MasterAuthzITestBase::SetUp());
+    NO_FATALS(SetUpCluster(std::get<0>(GetParam())));
+  }
 };
 
 TEST_P(TestAuthzTable, TestAuthorizeTable) {
-  const AuthzDescriptor& desc = GetParam();
+  const AuthzDescriptor& desc = std::get<1>(GetParam());
   const auto table_name = Substitute("$0.$1", desc.database, desc.table_name);
   const auto new_table_name = Substitute("$0.$1",
                                          desc.database, desc.new_table_name);
@@ -594,23 +975,26 @@ TEST_P(TestAuthzTable, TestAuthorizeTable) {
   // User 'test-user' attempts to operate on the table without proper privileges.
   Status s = desc.funcs.do_action(this, action_params);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+  ASSERT_STR_MATCHES(s.ToString(), "[Uu]nauthorized action");
 
   // User 'test-user' can operate on the table after obtaining proper privileges.
   ASSERT_OK(desc.funcs.grant_privileges(this, privilege_params));
   ASSERT_OK(desc.funcs.do_action(this, action_params));
 
   // Ensure that operating on a table while the Sentry is unreachable fails.
-  ASSERT_OK(StopSentry());
-  s = desc.funcs.do_action(this, action_params);
-  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  // No such guarantee exists for Ranger, which caches policies in its clients.
+  if (std::get<0>(GetParam()) != kRanger) {
+    ASSERT_OK(StopAuthzProvider());
+    s = desc.funcs.do_action(this, action_params);
+    ASSERT_FALSE(s.ok()) << s.ToString();
+  }
 }
 
 static const AuthzDescriptor kAuthzCombinations[] = {
     {
       {
-        &SentryITestBase::CreateTable,
-        &SentryITestBase::GrantCreateTablePrivilege,
+        &MasterAuthzITestBase::CreateTable,
+        &MasterAuthzITestBase::GrantCreateTablePrivilege,
         "CreateTable",
       },
       kDatabaseName,
@@ -619,8 +1003,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::DropTable,
-        &SentryITestBase::GrantDropTablePrivilege,
+        &MasterAuthzITestBase::DropTable,
+        &MasterAuthzITestBase::GrantDropTablePrivilege,
         "DropTable",
       },
       kDatabaseName,
@@ -629,8 +1013,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::AlterTable,
-        &SentryITestBase::GrantAlterTablePrivilege,
+        &MasterAuthzITestBase::AlterTable,
+        &MasterAuthzITestBase::GrantAlterTablePrivilege,
         "AlterTable",
       },
       kDatabaseName,
@@ -639,8 +1023,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::RenameTable,
-        &SentryITestBase::GrantRenameTablePrivilege,
+        &MasterAuthzITestBase::RenameTable,
+        &MasterAuthzITestBase::GrantRenameTablePrivilege,
         "RenameTable",
       },
       kDatabaseName,
@@ -649,8 +1033,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::GetTableSchema,
-        &SentryITestBase::GrantGetMetadataTablePrivilege,
+        &MasterAuthzITestBase::GetTableSchema,
+        &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
         "GetTableSchema",
       },
       kDatabaseName,
@@ -659,8 +1043,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::GetTableLocations,
-        &SentryITestBase::GrantGetMetadataTablePrivilege,
+        &MasterAuthzITestBase::GetTableLocations,
+        &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
         "GetTableLocations",
       },
       kDatabaseName,
@@ -669,8 +1053,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::GetTabletLocations,
-        &SentryITestBase::GrantGetMetadataTablePrivilege,
+        &MasterAuthzITestBase::GetTabletLocations,
+        &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
         "GetTabletLocations",
       },
       kDatabaseName,
@@ -679,8 +1063,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::IsCreateTableDone,
-        &SentryITestBase::GrantGetMetadataTablePrivilege,
+        &MasterAuthzITestBase::IsCreateTableDone,
+        &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
         "IsCreateTableDone",
       },
       kDatabaseName,
@@ -689,8 +1073,8 @@ static const AuthzDescriptor kAuthzCombinations[] = {
     },
     {
       {
-        &SentryITestBase::IsAlterTableDone,
-        &SentryITestBase::GrantGetMetadataTablePrivilege,
+        &MasterAuthzITestBase::IsAlterTableDone,
+        &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
         "IsAlterTableDone",
       },
       kDatabaseName,
@@ -700,50 +1084,76 @@ static const AuthzDescriptor kAuthzCombinations[] = {
 };
 INSTANTIATE_TEST_CASE_P(AuthzCombinations,
                         TestAuthzTable,
-                        ::testing::ValuesIn(kAuthzCombinations));
-
-// Test that when the client passes a table identifier with the table name
-// and table ID refer to different tables, the client needs permission on
-// both tables for returning TABLE_NOT_FOUND error to avoid leaking table
-// existence.
-TEST_F(MasterSentryTest, TestMismatchedTable) {
-  const auto table_name_a = Substitute("$0.$1", kDatabaseName, kTableName);
-  const auto table_name_b = Substitute("$0.$1", kDatabaseName, kSecondTable);
+                        ::testing::Combine(
+                            ::testing::Values(kSentry, kRanger),
+                            ::testing::ValuesIn(kAuthzCombinations)),
+                        [] (const testing::TestParamInfo<TestAuthzTable::ParamType>& info) {
+                          return Substitute("$0_$1", HarnessEnumToString(std::get<0>(info.param)),
+                                            std::get<1>(info.param).funcs.description);
+                        });
+
+class MasterSentryITest : public MasterAuthzITestBase {
+ public:
+  void SetUp() override {
+    NO_FATALS(MasterAuthzITestBase::SetUp());
+    NO_FATALS(SetUpCluster(kSentry));
+  }
+};
 
-  // Log in as 'test-admin' to get the tablet ID.
-  ASSERT_OK(cluster_->kdc()->Kinit(kAdminUser));
-  shared_ptr<KuduClient> client;
-  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
-  shared_ptr<KuduTable> table;
-  ASSERT_OK(client->OpenTable(table_name_a, &table));
-  optional<const string&> table_id_a = make_optional<const string&>(table->id());
+// Checks the user with table ownership automatically has ALL privilege on the
+// table. User 'test-user' can delete the same table without specifically
+// granting 'DROP ON TABLE'. Note that ownership population between the HMS and
+// the Sentry service happens synchronously, therefore, the table deletion
+// should succeed right after the table creation.
+// NOTE: this behavior is specific to Sentry,  so we don't parameterize.
+TEST_F(MasterSentryITest, TestTableOwnership) {
+  ASSERT_OK(GrantCreateTablePrivilege({ kDatabaseName }));
+  ASSERT_OK(CreateKuduTable(kDatabaseName, "new_table"));
+  NO_FATALS(CheckTable(kDatabaseName, "new_table",
+                       make_optional<const string&>(kTestUser)));
 
-  // Log back as 'test-user'.
-  ASSERT_OK(cluster_->kdc()->Kinit(kTestUser));
+  // TODO(hao): test create a table with a different owner than the client’s username?
+  ASSERT_OK(client_->DeleteTable(Substitute("$0.$1", kDatabaseName, "new_table")));
+  NO_FATALS(CheckTableDoesNotExist(kDatabaseName, "new_table"));
+}
 
-  Status s = GetTableLocationsWithTableId(table_name_b, table_id_a);
-  ASSERT_TRUE(s.IsNotAuthorized());
-  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+// Checks Sentry privileges are synchronized upon table rename in the HMS.
+TEST_F(MasterSentryITest, TestRenameTablePrivilegeTransfer) {
+  ASSERT_OK(GrantRenameTablePrivilege({ kDatabaseName, kTableName }));
+  ASSERT_OK(RenameTable({ Substitute("$0.$1", kDatabaseName, kTableName),
+                          Substitute("$0.$1", kDatabaseName, "b") }));
+  NO_FATALS(CheckTable(kDatabaseName, "b",
+                       make_optional<const string&>(kAdminUser)));
 
-  ASSERT_OK(GrantGetMetadataTablePrivilege({ kDatabaseName, kTableName }));
-  s = GetTableLocationsWithTableId(table_name_b, table_id_a);
-  ASSERT_TRUE(s.IsNotAuthorized());
-  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+  unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(
+      Substitute("$0.$1", kDatabaseName, "b")));
+  alterer->DropColumn("int16_val");
 
-  ASSERT_OK(GrantGetMetadataTablePrivilege({ kDatabaseName, kSecondTable }));
-  s = GetTableLocationsWithTableId(table_name_b, table_id_a);
-  ASSERT_TRUE(s.IsNotFound());
-  ASSERT_STR_CONTAINS(s.ToString(), "the table ID refers to a different table");
+  // Note that unlike table creation, there could be a delay between the table renaming
+  // in Kudu and the privilege renaming in Sentry. Because Kudu uses the transactional
+  // listener of the HMS to get notification of table alteration events, while Sentry
+  // uses post event listener (which is executed outside the HMS transaction). There
+  // is a chance that Kudu already finish the table renaming but the privilege renaming
+  // hasn't been reflected in the Sentry service.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(alterer->Alter());
+  });
+  NO_FATALS(CheckTable(kDatabaseName, "b", make_optional<const string&>(kAdminUser)));
 }
 
 class AuthzErrorHandlingTest :
-    public SentryITestBase,
-    public ::testing::WithParamInterface<AuthzFuncs> {
+    public MasterAuthzITestBase,
+    public ::testing::WithParamInterface<std::tuple<HarnessEnum, AuthzFuncs>> {
     // TODO(aserbin): update the test to introduce authz privilege caching
+ public:
+  void SetUp() override {
+    NO_FATALS(MasterAuthzITestBase::SetUp());
+    NO_FATALS(SetUpCluster(std::get<0>(GetParam())));
+  }
 };
 TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {
   static constexpr const char* const kTableName = "non_existent";
-  const AuthzFuncs& funcs = GetParam();
+  const AuthzFuncs& funcs = std::get<1>(GetParam());
   const auto table_name = Substitute("$0.$1", kDatabaseName, kTableName);
   const auto new_table_name = Substitute("$0.$1", kDatabaseName, "b");
   const OperationParams action_params = { table_name, new_table_name };
@@ -754,20 +1164,22 @@ TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {
   // TABLE_NOT_FOUND error.
   Status s = funcs.do_action(this, action_params);
   ASSERT_TRUE(s.IsNotAuthorized());
-  ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
+  ASSERT_STR_MATCHES(s.ToString(), "[Uu]nauthorized action");
 
-  // Ensure that operating on a non-existent table fails
-  // while Sentry is unreachable.
-  ASSERT_OK(StopSentry());
-  s = funcs.do_action(this, action_params);
-  ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  // Ensure that operating on a non-existent table fails while Sentry is
+  // unreachable. No such guarantee exists for Ranger.
+  if (std::get<0>(GetParam()) != kRanger) {
+    ASSERT_OK(StopAuthzProvider());
+    s = funcs.do_action(this, action_params);
+    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
 
-  // Ensure that operating on a non-existent table with proper privileges gives a
-  // TABLE_NOT_FOUND error.
-  ASSERT_OK(StartSentry());
+    // Ensure that operating on a non-existent table with proper privileges gives a
+    // TABLE_NOT_FOUND error.
+    ASSERT_OK(StartAuthzProvider());
+  }
   ASSERT_EVENTUALLY([&] {
-    // SentryAuthzProvider throttles reconnections, so it's necessary
-    // to wait out the backoff.
+    // SentryAuthzProvider throttles reconnections, so it's necessary to wait
+    // out the backoff.
     ASSERT_OK(funcs.grant_privileges(this, privilege_params));
   });
   s = funcs.do_action(this, action_params);
@@ -776,52 +1188,59 @@ TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {
 
 static const AuthzFuncs kAuthzFuncCombinations[] = {
     {
-      &SentryITestBase::DropTable,
-      &SentryITestBase::GrantDropTablePrivilege,
+      &MasterAuthzITestBase::DropTable,
+      &MasterAuthzITestBase::GrantDropTablePrivilege,
       "DropTable"
     },
     {
-      &SentryITestBase::AlterTable,
-      &SentryITestBase::GrantAlterTablePrivilege,
+      &MasterAuthzITestBase::AlterTable,
+      &MasterAuthzITestBase::GrantAlterTablePrivilege,
       "AlterTable"
     },
     {
-      &SentryITestBase::RenameTable,
-      &SentryITestBase::GrantRenameTablePrivilege,
+      &MasterAuthzITestBase::RenameTable,
+      &MasterAuthzITestBase::GrantRenameTablePrivilege,
       "RenameTable"
     },
     {
-      &SentryITestBase::GetTableSchema,
-      &SentryITestBase::GrantGetMetadataTablePrivilege,
+      &MasterAuthzITestBase::GetTableSchema,
+      &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
       "GetTableSchema"
     },
     {
-      &SentryITestBase::GetTableLocations,
-      &SentryITestBase::GrantGetMetadataTablePrivilege,
+      &MasterAuthzITestBase::GetTableLocations,
+      &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
       "GetTableLocations"
     },
     {
-      &SentryITestBase::IsCreateTableDone,
-      &SentryITestBase::GrantGetMetadataTablePrivilege,
+      &MasterAuthzITestBase::IsCreateTableDone,
+      &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
       "IsCreateTableDone"
     },
     {
-      &SentryITestBase::IsAlterTableDone,
-      &SentryITestBase::GrantGetMetadataTablePrivilege,
+      &MasterAuthzITestBase::IsAlterTableDone,
+      &MasterAuthzITestBase::GrantGetMetadataTablePrivilege,
       "IsAlterTableDone"
     },
 };
 
 INSTANTIATE_TEST_CASE_P(AuthzFuncCombinations,
                         AuthzErrorHandlingTest,
-                        ::testing::ValuesIn(kAuthzFuncCombinations));
+                        ::testing::Combine(
+                            ::testing::Values(kSentry, kRanger),
+                            ::testing::ValuesIn(kAuthzFuncCombinations)),
+                        [] (const testing::TestParamInfo<AuthzErrorHandlingTest::ParamType>& info) {
+                          return Substitute("$0_$1", HarnessEnumToString(std::get<0>(info.param)),
+                                            std::get<1>(info.param).description);
+                        });
 
 // Class for test scenarios verifying functionality of managing AuthzProvider's
 // privileges cache via Kudu RPC.
-class SentryAuthzProviderCacheITest : public SentryITestBase {
+class SentryAuthzProviderCacheITest : public MasterAuthzITestBase {
  public:
-  bool IsAuthzPrivilegeCacheEnabled() const override {
-    return true;
+  void SetUp() override {
+    NO_FATALS(MasterAuthzITestBase::SetUp());
+    NO_FATALS(SetUpCluster(kSentryWithCache));
   }
 
   Status ResetCache() {
@@ -859,7 +1278,7 @@ TEST_F(SentryAuthzProviderCacheITest, AlterTable) {
     auto s = table_alterer->Alter();
     ASSERT_TRUE(s.ok()) << s.ToString();
   }
-  ASSERT_OK(StopSentry());
+  ASSERT_OK(StopAuthzProvider());
   {
     unique_ptr<KuduTableAlterer> table_alterer(
         client_->NewTableAlterer(table_name)->DropColumn("int16_val"));
@@ -878,7 +1297,7 @@ TEST_F(SentryAuthzProviderCacheITest, AlterTable) {
   }
 
   // Try to do the same after starting Sentry back. It should be a success.
-  ASSERT_OK(StartSentry());
+  ASSERT_OK(StartAuthzProvider());
   {
     unique_ptr<KuduTableAlterer> table_alterer(
         client_->NewTableAlterer(table_name)->DropColumn("int32_val"));
@@ -975,9 +1394,7 @@ TEST_F(SentryAuthzProviderCacheITest, DISABLED_CreateTables) {
 
   // Grant CREATE TABLE and METADATA privileges on the database.
   ASSERT_OK(GrantCreateTablePrivilege({ kDatabaseName }));
-  ASSERT_OK(AlterRoleGrantPrivilege(
-      sentry_client_.get(), kDevRole,
-      GetDatabasePrivilege(kDatabaseName, "METADATA")));
+  ASSERT_OK(GrantGetMetadataDatabasePrivilege({ kDatabaseName }));
 
   // Make sure it's possible to create a table in the database. This also
   // populates the privileges cache with information on the privileges
@@ -993,7 +1410,7 @@ TEST_F(SentryAuthzProviderCacheITest, DISABLED_CreateTables) {
     ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   }
 
-  ASSERT_OK(StopSentry());
+  ASSERT_OK(StopAuthzProvider());
 
   // CreateTable() with operation timeout longer than HMS --> Sentry
   // communication timeout successfully completes. After failing to push
@@ -1039,7 +1456,7 @@ TEST_F(SentryAuthzProviderCacheITest, DISABLED_CreateTables) {
     ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
   }
 
-  ASSERT_OK(StartSentry());
+  ASSERT_OK(StartAuthzProvider());
 
   // Try to create the table after starting Sentry back: it should be a success.
   ASSERT_OK(CreateKuduTable(kDatabaseName, "t2"));
@@ -1079,7 +1496,7 @@ class AuthzCacheControlTest : public ExternalMiniClusterITestBase {
  public:
   void SetUp() override {
     ExternalMiniClusterITestBase::SetUp();
-    cluster::ExternalMiniClusterOptions opts;
+    ExternalMiniClusterOptions opts;
     opts.enable_kerberos = true;
     StartClusterWithOpts(opts);
   }
diff --git a/src/kudu/integration-tests/master_hms-itest.cc b/src/kudu/integration-tests/master_hms-itest.cc
index 5f5c343..5bca58a 100644
--- a/src/kudu/integration-tests/master_hms-itest.cc
+++ b/src/kudu/integration-tests/master_hms-itest.cc
@@ -36,12 +36,13 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/hms/hive_metastore_types.h"
 #include "kudu/hms/hms_client.h"
-#include "kudu/hms/mini_hms.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/hms_itest-base.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/security/test/mini_kdc.h"
 #include "kudu/thrift/client.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -60,11 +61,10 @@ using strings::Substitute;
 namespace kudu {
 
 // Test Master <-> HMS catalog synchronization.
-class MasterHmsTest : public HmsITestBase {
+class MasterHmsTest : public ExternalMiniClusterITestBase {
  public:
-
   void SetUp() override {
-    HmsITestBase::SetUp();
+    ExternalMiniClusterITestBase::SetUp();
 
     ExternalMiniClusterOptions opts;
     opts.hms_mode = GetHmsMode();
@@ -78,17 +78,73 @@ class MasterHmsTest : public HmsITestBase {
     thrift::ClientOptions hms_opts;
     hms_opts.enable_kerberos = EnableKerberos();
     hms_opts.service_principal = "hive";
-    hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
-    ASSERT_OK(hms_client_->Start());
+    ASSERT_OK(harness_.RestartHmsClient(cluster_, hms_opts));
   }
 
   void TearDown() override {
-    if (hms_client_) {
-      ASSERT_OK(hms_client_->Stop());
+    if (harness_.hms_client()) {
+      ASSERT_OK(harness_.hms_client()->Stop());
     }
-    HmsITestBase::TearDown();
+    ExternalMiniClusterITestBase::TearDown();
+  }
+
+  Status StartHms() {
+    return harness_.StartHms(cluster_);
+  }
+
+  Status StopHms() {
+    return harness_.StopHms(cluster_);
+  }
+
+  Status CreateDatabase(const std::string& database_name) {
+    return harness_.CreateDatabase(database_name);
+  }
+
+  Status CreateKuduTable(const std::string& database_name,
+                         const std::string& table_name,
+                         MonoDelta timeout = {}) {
+    return HmsITestHarness::CreateKuduTable(database_name, table_name, client_, timeout);
   }
 
+  Status CreateHmsTable(const std::string& database_name,
+                        const std::string& table_name,
+                        const std::string& table_type = hms::HmsClient::kManagedTable,
+                        const boost::optional<const std::string&>& kudu_table_name = boost::none) {
+    return harness_.CreateHmsTable(database_name, table_name, table_type, kudu_table_name);
+  }
+
+  Status RenameHmsTable(const std::string& database_name,
+                        const std::string& old_table_name,
+                        const std::string& new_table_name) {
+    return harness_.RenameHmsTable(database_name, old_table_name, new_table_name);
+  }
+
+  Status AlterHmsTableDropColumns(const std::string& database_name,
+                                  const std::string& table_name) {
+    return harness_.AlterHmsTableDropColumns(database_name, table_name);
+  }
+
+  Status AlterHmsTableExternalPurge(const std::string& database_name,
+                                    const std::string& table_name) {
+    return harness_.AlterHmsTableExternalPurge(database_name, table_name);
+  }
+
+  void CheckTable(const std::string& database_name,
+                  const std::string& table_name,
+                  const boost::optional<const std::string&>& user,
+                  const std::string& table_type = hms::HmsClient::kManagedTable) {
+    harness_.CheckTable(database_name, table_name, user, cluster_, client_, table_type);
+  }
+
+  void CheckTableDoesNotExist(const std::string& database_name,
+                              const std::string& table_name) {
+    harness_.CheckTableDoesNotExist(database_name, table_name, client_);
+  }
+
+
+ protected:
+
+  HmsITestHarness harness_;
  private:
 
   virtual HmsMode GetHmsMode() {
@@ -125,7 +181,7 @@ TEST_F(MasterHmsTest, TestCreateTable) {
   ASSERT_STR_CONTAINS(s.ToString(), "create_db.☃");
 
   // Drop the HMS entry and create the table through Kudu.
-  ASSERT_OK(hms_client_->DropTable(hms_database_name, hms_table_name));
+  ASSERT_OK(harness_.hms_client()->DropTable(hms_database_name, hms_table_name));
   ASSERT_OK(CreateKuduTable(hms_database_name, hms_table_name));
   NO_FATALS(CheckTable(hms_database_name, hms_table_name, /*user=*/none));
 
@@ -213,7 +269,7 @@ TEST_F(MasterHmsTest, TestRenameTable) {
 
   // Check that the two tables still exist.
   vector<string> tables;
-  ASSERT_OK(hms_client_->GetTableNames("db", &tables));
+  ASSERT_OK(harness_.hms_client()->GetTableNames("db", &tables));
   std::sort(tables.begin(), tables.end());
   ASSERT_EQ(tables, vector<string>({ "b", "d" })) << tables;
 
@@ -289,7 +345,7 @@ TEST_F(MasterHmsTest, TestAlterTable) {
   ASSERT_OK(client_->TableExists("default.c", &exists));
   ASSERT_TRUE(exists);
   hive::Table hms_table;
-  ASSERT_OK(hms_client_->GetTable("default", "a", &hms_table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "a", &hms_table));
 }
 
 TEST_F(MasterHmsTest, TestDeleteTable) {
@@ -297,7 +353,7 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
   ASSERT_OK(CreateKuduTable("default", "a"));
   NO_FATALS(CheckTable("default", "a", /*user=*/ none));
   hive::Table hms_table;
-  ASSERT_OK(hms_client_->GetTable("default", "a", &hms_table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "a", &hms_table));
 
   ASSERT_OK(client_->DeleteTable("default.a"));
   NO_FATALS(CheckTableDoesNotExist("default", "a"));
@@ -306,10 +362,10 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
   ASSERT_OK(CreateKuduTable("default", "b"));
   NO_FATALS(CheckTable("default", "b", /*user=*/ none));
   hive::Table hms_table_b;
-  ASSERT_OK(hms_client_->GetTable("default", "b", &hms_table_b));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "b", &hms_table_b));
   shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable("default.b", &table));
-  ASSERT_OK(hms_client_->DropTable("default", "b"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "b"));
   ASSERT_EVENTUALLY([&] {
       NO_FATALS(CheckTableDoesNotExist("default", "b"));
   });
@@ -322,7 +378,7 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
   NO_FATALS(CheckTable("default", "b", /*user=*/ none, hms::HmsClient::kExternalTable));
   shared_ptr<KuduTable> table2;
   ASSERT_OK(client_->OpenTable("default.b", &table2));
-  ASSERT_OK(hms_client_->DropTable("default", "b"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "b"));
   ASSERT_EVENTUALLY([&] {
     NO_FATALS(CheckTableDoesNotExist("default", "b"));
   });
@@ -346,15 +402,15 @@ TEST_F(MasterHmsTest, TestDeleteTable) {
   ASSERT_OK(CreateKuduTable("default", "d"));
   NO_FATALS(CheckTable("default", "d", /*user=*/ none));
   hive::Table hms_table_d;
-  ASSERT_OK(hms_client_->GetTable("default", "d", &hms_table_d));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "d", &hms_table_d));
   ASSERT_OK(client_->DeleteTableInCatalogs("default.d", false));
   s = client_->OpenTable(Substitute("$0.$1", "default", "d"), &table);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-  ASSERT_OK(hms_client_->GetTable("default", "d", &hms_table_d));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "d", &hms_table_d));
 
   // Create and drop a non-Kudu ('external') HMS table entry and ensure Kudu allows it.
   ASSERT_OK(CreateHmsTable("default", "externalTable", HmsClient::kExternalTable));
-  ASSERT_OK(hms_client_->DropTable("default", "externalTable"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "externalTable"));
 }
 
 TEST_F(MasterHmsTest, TestNotificationLogListener) {
@@ -374,7 +430,7 @@ TEST_F(MasterHmsTest, TestNotificationLogListener) {
 
   // Drop the table in the HMS, and ensure that the notification log listener
   // detects the drop and updates the Kudu catalog accordingly.
-  ASSERT_OK(hms_client_->DropTable("default", "b"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "b"));
   ASSERT_EVENTUALLY([&] {
     NO_FATALS(CheckTableDoesNotExist("default", "b"));
   });
@@ -404,7 +460,7 @@ TEST_F(MasterHmsTest, TestNotificationLogListener) {
 
   // Scenario 1: drop from the HMS first.
   ASSERT_OK(CreateKuduTable("default", "a"));
-  ASSERT_OK(hms_client_->DropTable("default", "a"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "a"));
   Status s = client_->DeleteTable("default.a");
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   NO_FATALS(CheckTableDoesNotExist("default", "a"));
@@ -412,7 +468,7 @@ TEST_F(MasterHmsTest, TestNotificationLogListener) {
   // Scenario 2: drop from Kudu first.
   ASSERT_OK(CreateKuduTable("default", "a"));
   ASSERT_OK(client_->DeleteTable("default.a"));
-  s = hms_client_->DropTable("default", "a");
+  s = harness_.hms_client()->DropTable("default", "a");
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   NO_FATALS(CheckTableDoesNotExist("default", "a"));
 
@@ -434,7 +490,7 @@ TEST_F(MasterHmsTest, TestNotificationLogListener) {
 
   // Drop the table in the HMS, and ensure that the notification log listener
   // detects the drop and updates the Kudu catalog accordingly.
-  ASSERT_OK(hms_client_->DropTable("default", "e"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "e"));
   ASSERT_EVENTUALLY([&] {
     NO_FATALS(CheckTableDoesNotExist("default", "e"));
   });
@@ -449,17 +505,17 @@ TEST_F(MasterHmsTest, TestIgnoreExternalTables) {
 
   // Drop a table in the HMS and check that it didn't affect the underlying
   // Kudu table.
-  ASSERT_OK(hms_client_->DropTable("default", "ext1"));
+  ASSERT_OK(harness_.hms_client()->DropTable("default", "ext1"));
   shared_ptr<KuduTable> table;
   ASSERT_OK(client_->OpenTable(kManagedTableName, &table));
 
   // Do the same, but rename the HMS table.
   hive::Table ext;
-  ASSERT_OK(hms_client_->GetTable("default", "ext2", &ext));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "ext2", &ext));
   ext.tableName = "ext3";
-  ASSERT_OK(hms_client_->AlterTable("default", "ext2", ext));
+  ASSERT_OK(harness_.hms_client()->AlterTable("default", "ext2", ext));
   ASSERT_OK(client_->OpenTable(kManagedTableName, &table));
-  Status s = hms_client_->GetTable("default", "ext2", &ext);
+  Status s = harness_.hms_client()->GetTable("default", "ext2", &ext);
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
 
   // Alter the table in Kudu, the external tables in the HMS will not be
@@ -467,7 +523,7 @@ TEST_F(MasterHmsTest, TestIgnoreExternalTables) {
   unique_ptr<KuduTableAlterer> table_alterer(
       client_->NewTableAlterer(kManagedTableName)->RenameTo("default.other"));
   ASSERT_OK(table_alterer->Alter());
-  ASSERT_OK(hms_client_->GetTable("default", "ext3", &ext));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "ext3", &ext));
   ASSERT_EQ(kManagedTableName, ext.parameters[HmsClient::kKuduTableNameKey]);
 }
 
@@ -615,7 +671,7 @@ TEST_F(MasterHmsKerberizedTest, TestTableOwnership) {
   // Create a table as the user and ensure that the ownership is set correctly.
   ASSERT_OK(CreateKuduTable("default", "my_table"));
   hive::Table table;
-  ASSERT_OK(hms_client_->GetTable("default", "my_table", &table));
+  ASSERT_OK(harness_.hms_client()->GetTable("default", "my_table", &table));
   ASSERT_EQ("test-user", table.owner);
 }
 } // namespace kudu
diff --git a/src/kudu/integration-tests/ts_sentry-itest.cc b/src/kudu/integration-tests/ts_sentry-itest.cc
index 5553262..772aac3 100644
--- a/src/kudu/integration-tests/ts_sentry-itest.cc
+++ b/src/kudu/integration-tests/ts_sentry-itest.cc
@@ -42,6 +42,7 @@
 #include "kudu/hms/hms_client.h"
 #include "kudu/hms/mini_hms.h"
 #include "kudu/integration-tests/data_gen_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/hms_itest-base.h"
 #include "kudu/master/sentry_authz_provider-test-base.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
@@ -75,6 +76,7 @@ using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableAlterer;
 using kudu::client::KuduTableCreator;
+using kudu::cluster::ExternalMiniCluster;
 using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::master::AlterRoleGrantPrivilege;
 using kudu::master::CreateRoleAndAddToGroups;
@@ -241,33 +243,28 @@ Status PerformAction(const RWPrivileges& privileges,
   return Status::OK();
 }
 
+// Note: groups and users therein are statically provided to MiniSentry (see
+// mini_sentry.cc). We expect Sentry to be aware of users "user[0-2]".
+constexpr int kNumUsers = 3;
+constexpr const char* kAdminGroup = "admin";
+
+constexpr int kNumTables = 3;
+constexpr int kNumColsPerTable = 3;
+constexpr const char* kDb = "db";
+constexpr const char* kTablePrefix = "table";
+constexpr const char* kAdminRole = "kudu-admin";
+
+constexpr int kAuthzTokenTTLSecs = 1;
+constexpr int kAuthzCacheTTLMultiplier = 3;
+
 } // anonymous namespace
 
 // These tests will use the HMS and Sentry, and thus, are very slow.
 // SKIP_IF_SLOW_NOT_ALLOWED() should be the very first thing called in the body
 // of every test based on this test class.
-class TSSentryITest : public HmsITestBase {
+class TSSentryITestHarness : public HmsITestHarness {
  public:
-  // Note: groups and users therein are statically provided to MiniSentry (see
-  // mini_sentry.cc). We expect Sentry to be aware of users "user[0-2]".
-  static constexpr int kNumUsers = 3;
-  static constexpr const char* kAdminGroup = "admin";
-
-  static constexpr int kNumTables = 3;
-  static constexpr int kNumColsPerTable = 3;
-  static constexpr const char* kDb = "db";
-  static constexpr const char* kTablePrefix = "table";
-  static constexpr const char* kAdminRole = "kudu-admin";
-
-  static constexpr int kAuthzTokenTTLSecs = 1;
-  static constexpr int kAuthzCacheTTLMultiplier = 3;
-
-  void SetUp() override {
-    SKIP_IF_SLOW_NOT_ALLOWED();
-    for (int u = 0; u < kNumUsers; u++) {
-      users_.emplace_back(Substitute("user$0", u));
-    }
-
+  ExternalMiniClusterOptions GetClusterOpts() {
     ExternalMiniClusterOptions opts;
     opts.enable_kerberos = true;
     opts.enable_sentry = true;
@@ -284,41 +281,50 @@ class TSSentryITest : public HmsITestBase {
     opts.extra_tserver_flags.emplace_back(
         Substitute("--user_acl=$0", JoinStrings(users_, ",")));
     opts.extra_tserver_flags.emplace_back("--tserver_enforce_access_control=true");
-    NO_FATALS(StartClusterWithOpts(std::move(opts)));
-    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu"));
-    ASSERT_OK(cluster_->kdc()->Kinit("kudu"));
 
+    return opts;
+  }
+
+  Status SetUpExternalServiceClients(const unique_ptr<ExternalMiniCluster>& cluster) {
     // Set up the HMS client so we can set up a database.
     thrift::ClientOptions hms_opts;
     hms_opts.enable_kerberos = true;
     hms_opts.service_principal = "hive";
-    hms_client_.reset(new hms::HmsClient(cluster_->hms()->address(), hms_opts));
-    ASSERT_OK(hms_client_->Start());
+    hms_client_.reset(new hms::HmsClient(cluster->hms()->address(), hms_opts));
+    RETURN_NOT_OK(hms_client_->Start());
 
     // Set up the Sentry client so we can set up privileges.
     thrift::ClientOptions sentry_opts;
     sentry_opts.enable_kerberos = true;
     sentry_opts.service_principal = "sentry";
-    sentry_client_.reset(new SentryClient(cluster_->sentry()->address(), sentry_opts));
-    ASSERT_OK(sentry_client_->Start());
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kAdminRole, kAdminGroup));
-    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kAdminRole,
+    sentry_client_.reset(new SentryClient(cluster->sentry()->address(), sentry_opts));
+    RETURN_NOT_OK(sentry_client_->Start());
+
+    return Status::OK();
+  }
+
+  Status SetUpCredentials() {
+    RETURN_NOT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kAdminRole, kAdminGroup));
+    RETURN_NOT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kAdminRole,
         GetDatabasePrivilege(kDb, "ALL", TSentryGrantOption::DISABLED)));
 
-    // Create the database in the HMS.
-    ASSERT_OK(CreateDatabase(kDb));
+    return Status::OK();
+  }
 
-    // Create a client as the "kudu" user, who now has admin privileges.
-    ASSERT_OK(cluster_->CreateClient(nullptr, &admin_client_));
+  Status SetUpTables() {
+    // Create the database in the HMS.
+    RETURN_NOT_OK(CreateDatabase(kDb));
 
     // Finally populate a set of column names to use for our tables.
     for (int i = 0; i < kNumColsPerTable; i++) {
       cols_.emplace_back(Substitute("col$0", i));
     }
+
+    return Status::OK();
   }
 
   // Creates a table named 'table_ident' with 'kNumColsPerTable' columns.
-  Status CreateTable(const string& table_ident) {
+  Status CreateTable(const string& table_ident, const shared_ptr<KuduClient>& client) {
     KuduSchema schema;
     KuduSchemaBuilder b;
     auto iter = cols_.begin();
@@ -327,7 +333,7 @@ class TSSentryITest : public HmsITestBase {
       b.AddColumn(*iter++)->Type(KuduColumnSchema::INT32);
     }
     RETURN_NOT_OK(b.Build(&schema));
-    unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
     return table_creator->table_name(table_ident)
         .schema(&schema)
         .set_range_partition_columns({ "col0" })
@@ -335,18 +341,15 @@ class TSSentryITest : public HmsITestBase {
         .Create();
   }
 
-  void TearDown() override {
-    SKIP_IF_SLOW_NOT_ALLOWED();
-    HmsITestBase::TearDown();
+  void AddUsers() {
+    for (int u = 0; u < kNumUsers; u++) {
+      users_.emplace_back(Substitute("user$0", u));
+    }
   }
 
- protected:
   // A Sentry client with which to grant privileges.
   unique_ptr<SentryClient> sentry_client_;
 
-  // Kudu client with which to perform admin operations.
-  shared_ptr<KuduClient> admin_client_;
-
   // A list of users that may try to do things.
   vector<string> users_;
 
@@ -354,6 +357,42 @@ class TSSentryITest : public HmsITestBase {
   vector<string> cols_;
 };
 
+class TSSentryITest : public ExternalMiniClusterITestBase {
+ public:
+  void SetUp() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+    ExternalMiniClusterITestBase::SetUp();
+    harness_.AddUsers();
+    ExternalMiniClusterOptions opts = harness_.GetClusterOpts();
+    NO_FATALS(StartClusterWithOpts(std::move(opts)));
+
+    ASSERT_OK(cluster_->kdc()->CreateUserPrincipal("kudu"));
+    ASSERT_OK(cluster_->kdc()->Kinit("kudu"));
+
+    ASSERT_OK(harness_.SetUpExternalServiceClients(cluster_));
+    ASSERT_OK(harness_.SetUpCredentials());
+    ASSERT_OK(harness_.SetUpTables());
+
+    // Create a client as the "kudu" user, who now has admin privileges.
+    ASSERT_OK(cluster_->CreateClient(nullptr, &admin_client_));
+  }
+
+  Status CreateTable(const string& table_ident) {
+    return harness_.CreateTable(table_ident, client_);
+  }
+
+  void TearDown() override {
+    SKIP_IF_SLOW_NOT_ALLOWED();
+    ExternalMiniClusterITestBase::TearDown();
+  }
+
+ protected:
+  // Kudu client with which to perform admin operations.
+  shared_ptr<KuduClient> admin_client_;
+
+  TSSentryITestHarness harness_;
+};
+
 // Tests authorizing read and write operations coming from multiple concurrent
 // users for multiple tables.
 TEST_F(TSSentryITest, TestReadsAndWrites) {
@@ -376,16 +415,17 @@ TEST_F(TSSentryITest, TestReadsAndWrites) {
   // Set up a bunch of clients for each user.
   unordered_map<string, vector<shared_ptr<KuduClient>>> user_to_clients;
   ThreadSafeRandom prng(SeedRandom());
-  unordered_set<string> cols(cols_.begin(), cols_.end());
+  unordered_set<string> cols(harness_.cols_.begin(), harness_.cols_.end());
   static constexpr int kNumClientsPerUser = 4;
   for (int i = 0; i < kNumUsers; i++) {
-    const string& user = users_[i];
+    const string& user = harness_.users_[i];
     // Register the user with the KDC, and add a role to the user's group
     // (provided to MiniSentry in mini_sentry.cc).
     ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(user));
     ASSERT_OK(cluster_->kdc()->Kinit(user));
     const string role = Substitute("role$0", i);
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), role, Substitute("group$0", i)));
+    ASSERT_OK(CreateRoleAndAddToGroups(harness_.sentry_client_.get(),
+                                       role, Substitute("group$0", i)));
 
     // Set up multiple clients for each user.
     vector<shared_ptr<KuduClient>> clients;
@@ -402,11 +442,11 @@ TEST_F(TSSentryITest, TestReadsAndWrites) {
     for (const string& table_name : tables) {
       RWPrivileges granted_privileges = GeneratePrivileges(cols, &prng);
       for (const auto& wp : granted_privileges.table_write_privileges) {
-        ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role,
+        ASSERT_OK(AlterRoleGrantPrivilege(harness_.sentry_client_.get(), role,
                   GetTablePrivilege(kDb, table_name, WritePrivilegeToString(wp))));
       }
       for (const auto& col : granted_privileges.column_scan_privileges) {
-        ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role,
+        ASSERT_OK(AlterRoleGrantPrivilege(harness_.sentry_client_.get(), role,
                   GetColumnPrivilege(kDb, table_name, col, "SELECT")));
       }
       RWPrivileges not_granted_privileges = ComplementaryPrivileges(cols, granted_privileges);
@@ -428,7 +468,7 @@ TEST_F(TSSentryITest, TestReadsAndWrites) {
       t.join();
     }
   });
-  for (const string& user : users_) {
+  for (const string& user : harness_.users_) {
     // Start a thread for every user that performs a bunch of operations.
     const auto* const table_to_privileges = FindOrNull(user_to_privileges, user);
     for (const auto& client_sp : FindOrDie(user_to_clients, user)) {
@@ -478,7 +518,7 @@ TEST_F(TSSentryITest, TestAlters) {
   ASSERT_OK(cluster_->kdc()->CreateUserPrincipal(user));
   ASSERT_OK(cluster_->kdc()->Kinit(user));
   const string role = "role0";
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), role, "group0"));
+  ASSERT_OK(CreateRoleAndAddToGroups(harness_.sentry_client_.get(), role, "group0"));
 
   shared_ptr<KuduClient> user_client;
   ASSERT_OK(cluster_->CreateClient(nullptr, &user_client));
@@ -486,13 +526,13 @@ TEST_F(TSSentryITest, TestAlters) {
   // Note: we only need privileges on the metadata for OpenTable() calls.
   // METADATA isn't a first-class Sentry privilege and won't get carried over
   // on table rename, so we just grant INSERT privileges.
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role,
+  ASSERT_OK(AlterRoleGrantPrivilege(harness_.sentry_client_.get(), role,
             GetTablePrivilege(kDb, kTableName, "INSERT")));
 
   // First, grant privileges on a new column that doesn't yet exist. Once that
   // column is created, we should be able to scan it immediately.
   const string new_column = Substitute("col$0", kNumColsPerTable);
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role,
+  ASSERT_OK(AlterRoleGrantPrivilege(harness_.sentry_client_.get(), role,
             GetColumnPrivilege(kDb, kTableName, new_column, "SELECT")));
   {
     unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_ident));
@@ -507,7 +547,7 @@ TEST_F(TSSentryITest, TestAlters) {
   // Since privileges are cached, even though we've granted privileges, clients
   // will use the cached privilege and not be authorized for a bit.
   const string another_column = Substitute("col$0", kNumColsPerTable + 1);
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), role,
+  ASSERT_OK(AlterRoleGrantPrivilege(harness_.sentry_client_.get(), role,
             GetColumnPrivilege(kDb, kTableName, another_column, "SELECT")));
   {
     unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_ident));
diff --git a/src/kudu/ranger/mini_ranger-test.cc b/src/kudu/ranger/mini_ranger-test.cc
index 9716938..bb0ec29 100644
--- a/src/kudu/ranger/mini_ranger-test.cc
+++ b/src/kudu/ranger/mini_ranger-test.cc
@@ -24,7 +24,9 @@
 #include <gtest/gtest.h>
 
 #include "kudu/ranger/ranger.pb.h"
-#include "kudu/util/status.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/faststring.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
@@ -54,12 +56,11 @@ TEST_F(MiniRangerTest, TestGrantPrivilege) {
   policy.databases.emplace_back("foo");
   policy.tables.emplace_back("bar");
   policy.items.emplace_back(std::move(item));
-  policy.name = "test1";
 
   ASSERT_OK(ranger_.AddPolicy(std::move(policy)));
 }
 
-TEST_F(MiniRangerTest, TestGrantSamePrivilegeAfterRestart) {
+TEST_F(MiniRangerTest, TestPersistence) {
   PolicyItem item;
   item.first.emplace_back("testuser");
   item.second.emplace_back(ActionPB::ALTER);
@@ -68,18 +69,18 @@ TEST_F(MiniRangerTest, TestGrantSamePrivilegeAfterRestart) {
   policy.databases.emplace_back("foo");
   policy.tables.emplace_back("bar");
   policy.items.emplace_back(std::move(item));
-  policy.name = "test1";
 
   ASSERT_OK(ranger_.AddPolicy(policy));
 
   ASSERT_OK(ranger_.Stop());
   ASSERT_OK(ranger_.Start());
 
-  const string kExpectedError = "Another policy already exists for matching resource";
-
-  Status s = ranger_.AddPolicy(std::move(policy));
-  ASSERT_TRUE(s.IsRemoteError());
-  ASSERT_STR_CONTAINS(s.ToString(), kExpectedError);
+  EasyCurl curl;
+  curl.set_auth(CurlAuthType::BASIC, "admin", "admin");
+  faststring result;
+  ASSERT_OK(curl.FetchURL(JoinPathSegments(ranger_.admin_url(), "service/plugins/policies/count"),
+                          &result));
+  ASSERT_EQ("1", result.ToString());
 }
 
 } // namespace ranger
diff --git a/src/kudu/ranger/mini_ranger.cc b/src/kudu/ranger/mini_ranger.cc
index 9d5c284..8b411d6 100644
--- a/src/kudu/ranger/mini_ranger.cc
+++ b/src/kudu/ranger/mini_ranger.cc
@@ -20,9 +20,12 @@
 #include <csignal>
 #include <ostream>
 #include <string>
+#include <vector>
 
 #include <glog/logging.h>
 
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/postgres/mini_postgres.h"
 #include "kudu/ranger/mini_ranger_configs.h"
@@ -40,7 +43,7 @@
 #include "kudu/util/test_util.h"
 
 using std::string;
-using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 static constexpr int kRangerStartTimeoutMs = 60000;
@@ -254,9 +257,23 @@ Status MiniRanger::CreateKuduService() {
 }
 
 Status MiniRanger::AddPolicy(AuthorizationPolicy policy) {
+  string policy_name = JoinStrings<vector<string>>({JoinStrings(policy.databases, ","),
+                                                    JoinStrings(policy.tables, ","),
+                                                    JoinStrings(policy.columns, ",")}, ";");
+  PolicyKey policy_key(policy.databases, policy.tables, policy.columns);
+  vector<PolicyItem>* items = FindOrNull(policies_, policy_key);
+  if (!items) {
+    policies_.emplace(policy_key, policy.items);
+    items = &policy.items;
+  } else {
+    for (const auto& item : policy.items) {
+      items->emplace_back(item);
+    }
+  }
+
   EasyJson policy_json;
   policy_json.Set("service", "kudu");
-  policy_json.Set("name", policy.name);
+  policy_json.Set("name", policy_name);
   policy_json.Set("isEnabled", true);
 
   EasyJson resources = policy_json.Set("resources", EasyJson::kObject);
@@ -286,7 +303,7 @@ Status MiniRanger::AddPolicy(AuthorizationPolicy policy) {
   }
 
   EasyJson policy_items = policy_json.Set("policyItems", EasyJson::kArray);
-  for (const auto& policy_item : policy.items) {
+  for (const auto& policy_item : *items) {
     EasyJson item = policy_items.PushBack(EasyJson::kObject);
 
     EasyJson users = item.Set("users", EasyJson::kArray);
@@ -302,8 +319,8 @@ Status MiniRanger::AddPolicy(AuthorizationPolicy policy) {
     }
   }
 
-  RETURN_NOT_OK_PREPEND(PostToRanger("service/plugins/policies", std::move(policy_json)),
-                        "Failed to add policy");
+  RETURN_NOT_OK_PREPEND(PostToRanger("service/plugins/policies?deleteIfExists=true",
+                                     policy_json), "Failed to add policy");
   return Status::OK();
 }
 
diff --git a/src/kudu/ranger/mini_ranger.h b/src/kudu/ranger/mini_ranger.h
index 7449f13..5d15278 100644
--- a/src/kudu/ranger/mini_ranger.h
+++ b/src/kudu/ranger/mini_ranger.h
@@ -18,8 +18,10 @@
 #pragma once
 
 #include <cstdint>
+#include <map>
 #include <memory>
 #include <string>
+#include <tuple>
 #include <utility>
 #include <vector>
 
@@ -49,11 +51,16 @@ typedef std::vector<std::string> UserList;
 // cross-product is taken.
 typedef std::pair<UserList, std::vector<ActionPB>> PolicyItem;
 
+// Policy key used for searching policies_ (values are PolicyItems).
+typedef std::tuple<std::vector<std::string>,
+                   std::vector<std::string>,
+                   std::vector<std::string>> PolicyKey;
+
 // The AuthorizationPolicy contains a set of privileges on a resource to one or
 // more users. 'items' is a vector of user-list of actions pair. This struct can
-// be used to create new Ranger policies in tests.
+// be used to create new Ranger policies in tests. The policy name is based on
+// its contents (list of databases, tables and columns).
 struct AuthorizationPolicy {
-  std::string name;
   std::vector<std::string> databases;
   std::vector<std::string> tables;
   std::vector<std::string> columns;
@@ -104,6 +111,10 @@ class MiniRanger {
     policy_poll_interval_ms_ = policy_poll_interval_ms;
   }
 
+  std::string admin_url() const {
+    return ranger_admin_url_;
+  }
+
  private:
   // Starts the Ranger service.
   Status StartRanger() WARN_UNUSED_RESULT;
@@ -180,6 +191,12 @@ class MiniRanger {
   // default is 200ms so that tests don't have to wait too long until freshly
   // created policies can be used.
   uint32_t policy_poll_interval_ms_ = 200;
+
+  // Stores existing policies since starting the MiniRanger instance. This is
+  // used for adding new policy items (list of users and privileges) to existing
+  // policies (resources) as Ranger doesn't support this and we need to delete
+  // it and recreate it.
+  std::map<PolicyKey, std::vector<PolicyItem>> policies_;
 };
 
 } // namespace ranger


[kudu] 01/02: [ranger] Fix and refactor RangerClient

Posted by ab...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e7044a5cbe1d1f9d36204279c73901d37451a570
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Thu Apr 9 14:58:31 2020 +0200

    [ranger] Fix and refactor RangerClient
    
    While working on integration tests we found a discrepancy handling
    column-level privileges between Ranger and Sentry. This was caused by
    RangerClient returning NotAuthorized when the requested action couldn't
    be authorized on any of the resources. This commit moves all
    authorization logic to the authorization provider instead of the client.
    
    Change-Id: I3fb7272601c9cb7ebe2e250bea728e76894d242a
    Reviewed-on: http://gerrit.cloudera.org:8080/15696
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Attila Bukor <ab...@apache.org>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/master/ranger_authz_provider.cc | 209 +++++++++++++++++++++++--------
 src/kudu/ranger/ranger_client-test.cc    |  42 ++++---
 src/kudu/ranger/ranger_client.cc         |  94 +++-----------
 src/kudu/ranger/ranger_client.h          |  40 +++---
 4 files changed, 219 insertions(+), 166 deletions(-)

diff --git a/src/kudu/master/ranger_authz_provider.cc b/src/kudu/master/ranger_authz_provider.cc
index 8fe4e70..4129dd4 100644
--- a/src/kudu/master/ranger_authz_provider.cc
+++ b/src/kudu/master/ranger_authz_provider.cc
@@ -23,20 +23,16 @@
 #include <glog/logging.h>
 
 #include "kudu/common/common.pb.h"
+#include "kudu/common/table_util.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/ranger/ranger.pb.h"
 #include "kudu/security/token.pb.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
 DECLARE_string(ranger_config_path);
 
-namespace kudu {
-
-class Env;
-class MetricEntity;
-
-namespace master {
-
 using kudu::security::ColumnPrivilegePB;
 using kudu::security::TablePrivilegePB;
 using kudu::ranger::ActionPB;
@@ -44,6 +40,34 @@ using kudu::ranger::ActionHash;
 using kudu::ranger::RangerClient;
 using std::string;
 using std::unordered_set;
+using strings::Substitute;
+
+namespace kudu {
+
+class Env;
+class MetricEntity;
+
+namespace master {
+
+namespace {
+
+const char* kUnauthorizedAction = "Unauthorized action";
+const char* kDenyNonRangerTableTemplate = "Denying action on table with invalid name $0. "
+                                          "Use 'kudu table rename_table' to rename it to "
+                                          "a Ranger-compatible name.";
+
+Status ParseTableIdentifier(const string& table_name, string* db, string* table) {
+  Slice tbl;
+  auto s = ParseRangerTableIdentifier(table_name, db, &tbl);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+  *table = tbl.ToString();
+  return Status::OK();
+}
+
+} // anonymous namespace
 
 RangerAuthzProvider::RangerAuthzProvider(Env* env,
                                          const scoped_refptr<MetricEntity>& metric_entity) :
@@ -61,9 +85,23 @@ Status RangerAuthzProvider::AuthorizeCreateTable(const string& table_name,
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
+
+  string db;
+  string tbl;
+
+  RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
+
+  bool authorized;
   // Table creation requires 'CREATE ON DATABASE' privilege.
-  return client_.AuthorizeAction(user, ActionPB::CREATE, table_name,
-                                 RangerClient::Scope::DATABASE);
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, db, tbl, &authorized,
+                                        RangerClient::Scope::DATABASE));
+
+  if (PREDICT_FALSE(!authorized)) {
+    LOG(WARNING) << Substitute("User $0 is not authorized to CREATE $1", user, table_name);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+
+  return Status::OK();
 }
 
 Status RangerAuthzProvider::AuthorizeDropTable(const string& table_name,
@@ -71,8 +109,20 @@ Status RangerAuthzProvider::AuthorizeDropTable(const string& table_name,
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
-  // Table deletion requires 'DROP ON TABLE' privilege.
-  return client_.AuthorizeAction(user, ActionPB::DROP, table_name);
+
+  string db;
+  string tbl;
+
+  RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
+  bool authorized;
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::DROP, db, tbl, &authorized));
+
+  if (PREDICT_FALSE(!authorized)) {
+    LOG(WARNING) << Substitute("User $0 is not authorized to DROP $1", user, table_name);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+
+  return Status::OK();
 }
 
 Status RangerAuthzProvider::AuthorizeAlterTable(const string& old_table,
@@ -81,16 +131,45 @@ Status RangerAuthzProvider::AuthorizeAlterTable(const string& old_table,
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
+
+  string old_db;
+  string old_tbl;
+
+  RETURN_NOT_OK(ParseTableIdentifier(old_table, &old_db, &old_tbl));
   // Table alteration (without table rename) requires ALTER ON TABLE.
+  bool authorized;
   if (old_table == new_table) {
-    return client_.AuthorizeAction(user, ActionPB::ALTER, old_table);
+    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALTER, old_db, old_tbl, &authorized));
+
+    if (PREDICT_FALSE(!authorized)) {
+      LOG(WARNING) << Substitute("User $0 is not authorized to ALTER $1", user, old_table);
+      return Status::NotAuthorized(kUnauthorizedAction);
+    }
+
+    return Status::OK();
   }
 
   // To prevent privilege escalation we require ALL on the old TABLE
   // and CREATE on the new DATABASE for table rename.
-  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, old_table));
-  return client_.AuthorizeAction(user, ActionPB::CREATE, new_table,
-                                 RangerClient::Scope::DATABASE);
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, old_db, old_tbl, &authorized));
+  if (PREDICT_FALSE(!authorized)) {
+    LOG(WARNING) << Substitute("User $0 is not authorized to perform ALL on $1", user, old_table);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+
+  string new_db;
+  string new_tbl;
+
+  RETURN_NOT_OK(ParseTableIdentifier(new_table, &new_db, &new_tbl));
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::CREATE, new_db, new_tbl, &authorized,
+                                        RangerClient::Scope::DATABASE));
+
+  if (PREDICT_FALSE(!authorized)) {
+    LOG(WARNING) << Substitute("User $0 is not authorized to CREATE $1", user, new_table);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+
+  return Status::OK();
 }
 
 Status RangerAuthzProvider::AuthorizeGetTableMetadata(const string& table_name,
@@ -98,8 +177,22 @@ Status RangerAuthzProvider::AuthorizeGetTableMetadata(const string& table_name,
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
+
+  string db;
+  string tbl;
+
+  RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
+  bool authorized;
   // Get table metadata requires 'METADATA ON TABLE' privilege.
-  return client_.AuthorizeAction(user, ActionPB::METADATA, table_name);
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::METADATA, db, tbl, &authorized));
+
+  if (PREDICT_FALSE(!authorized)) {
+    LOG(WARNING) << Substitute("User $0 is not authorized to access METADATA on $1", user,
+        table_name);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+
+  return Status::OK();
 }
 
 Status RangerAuthzProvider::AuthorizeListTables(const string& user,
@@ -126,9 +219,21 @@ Status RangerAuthzProvider::AuthorizeGetTableStatistics(const string& table_name
   if (IsTrustedUser(user)) {
     return Status::OK();
   }
+  string db;
+  string tbl;
+
+  RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
+  bool authorized;
   // Statistics contain data (e.g. number of rows) that requires the 'SELECT ON TABLE'
   // privilege.
-  return client_.AuthorizeAction(user, ActionPB::SELECT, table_name);
+  RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::SELECT, db, tbl, &authorized));
+
+  if (PREDICT_FALSE(!authorized)) {
+    LOG(WARNING) << Substitute("User $0 is not authorized to SELECT on $1", user, table_name);
+    return Status::NotAuthorized(kUnauthorizedAction);
+  }
+
+  return Status::OK();
 }
 
 Status RangerAuthzProvider::FillTablePrivilegePB(const string& table_name,
@@ -137,7 +242,18 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const string& table_name,
                                                  TablePrivilegePB* pb) {
   DCHECK(pb);
   DCHECK(pb->has_table_id());
-  if (IsTrustedUser(user) || client_.AuthorizeAction(user, ActionPB::ALL, table_name).ok()) {
+
+  string db;
+  string tbl;
+  RETURN_NOT_OK(ParseTableIdentifier(table_name, &db, &tbl));
+
+  bool authorized;
+  if (IsTrustedUser(user)) {
+    authorized = true;
+  } else {
+    RETURN_NOT_OK(client_.AuthorizeAction(user, ActionPB::ALL, db, tbl, &authorized));
+  }
+  if (authorized) {
     pb->set_delete_privilege(true);
     pb->set_insert_privilege(true);
     pb->set_scan_privilege(true);
@@ -154,29 +270,27 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const string& table_name,
 
   // Check if the user has any table-level privileges. If yes, we set them. If
   // select is included, we can also return.
-  auto s = client_.AuthorizeActions(user, table_name, &actions);
-  if (s.ok()) {
-    for (const ActionPB& action : actions) {
-      switch (action) {
-        case ActionPB::DELETE:
-          pb->set_delete_privilege(true);
-          break;
-        case ActionPB::UPDATE:
-          pb->set_update_privilege(true);
-          break;
-        case ActionPB::INSERT:
-          pb->set_insert_privilege(true);
-          break;
-        case ActionPB::SELECT:
-          pb->set_scan_privilege(true);
-          break;
-        default:
-          LOG(WARNING) << "Unexpected action returned by Ranger: " << ActionPB_Name(action);
-          break;
-      }
-      if (pb->scan_privilege()) {
-        return Status::OK();
-      }
+  RETURN_NOT_OK(client_.AuthorizeActions(user, db, tbl, &actions));
+  for (const ActionPB& action : actions) {
+    switch (action) {
+      case ActionPB::DELETE:
+        pb->set_delete_privilege(true);
+        break;
+      case ActionPB::UPDATE:
+        pb->set_update_privilege(true);
+        break;
+      case ActionPB::INSERT:
+        pb->set_insert_privilege(true);
+        break;
+      case ActionPB::SELECT:
+        pb->set_scan_privilege(true);
+        break;
+      default:
+        LOG(WARNING) << "Unexpected action returned by Ranger: " << ActionPB_Name(action);
+        break;
+    }
+    if (pb->scan_privilege()) {
+      return Status::OK();
     }
   }
 
@@ -190,15 +304,10 @@ Status RangerAuthzProvider::FillTablePrivilegePB(const string& table_name,
     column_names.emplace(col.name());
   }
 
-  // If AuthorizeAction returns NotAuthorized, that means no column-level select
-  // is allowed to the user. In this case we return the previous status.
-  // Otherwise we populate schema_pb and return OK.
-  //
-  // TODO(abukor): revisit if it's worth merge this into the previous request
-  if (!client_.AuthorizeActionMultipleColumns(user, ActionPB::SELECT, table_name,
-                                              &column_names).ok()) {
-    return s;
-  }
+
+  // TODO(abukor): revisit if it's worth merging this into the previous request
+  RETURN_NOT_OK(client_.AuthorizeActionMultipleColumns(user, ActionPB::SELECT, db, tbl,
+                                                       &column_names));
 
   for (const auto& col : schema_pb.columns()) {
     if (ContainsKey(column_names, col.name())) {
diff --git a/src/kudu/ranger/ranger_client-test.cc b/src/kudu/ranger/ranger_client-test.cc
index 02fe0cd..c08dd75 100644
--- a/src/kudu/ranger/ranger_client-test.cc
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -171,13 +171,16 @@ class RangerClientTest : public KuduTest {
 };
 
 TEST_F(RangerClientTest, TestAuthorizeCreateTableUnauthorized) {
-  auto s = client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar.baz");
-  ASSERT_TRUE(s.IsNotAuthorized());
+  bool authorized;
+  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "bar", "baz", &authorized));
+  ASSERT_FALSE(authorized);
 }
 
 TEST_F(RangerClientTest, TestAuthorizeCreateTableAuthorized) {
   Allow("jdoe", ActionPB::CREATE, "foo", "bar");
-  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo.bar"));
+  bool authorized;
+  ASSERT_OK(client_.AuthorizeAction("jdoe", ActionPB::CREATE, "foo", "bar", &authorized));
+  ASSERT_TRUE(authorized);
 }
 
 TEST_F(RangerClientTest, TestAuthorizeListNoTables) {
@@ -257,7 +260,7 @@ TEST_F(RangerClientTest, TestAuthorizeScanSubsetAuthorized) {
   columns.emplace("col3");
   columns.emplace("col4");
   ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
-                                                   "default.foobar", &columns));
+                                                   "default", "foobar", &columns));
   ASSERT_EQ(2, columns.size());
   ASSERT_TRUE(ContainsKey(columns, "col1"));
   ASSERT_TRUE(ContainsKey(columns, "col3"));
@@ -276,7 +279,7 @@ TEST_F(RangerClientTest, TestAuthorizeScanAllColumnsAuthorized) {
   columns.emplace("col3");
   columns.emplace("col4");
   ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
-                                                   "default.foobar", &columns));
+                                                   "default", "foobar", &columns));
   ASSERT_EQ(4, columns.size());
   ASSERT_TRUE(ContainsKey(columns, "col1"));
   ASSERT_TRUE(ContainsKey(columns, "col2"));
@@ -289,10 +292,9 @@ TEST_F(RangerClientTest, TestAuthorizeScanNoColumnsAuthorized) {
   for (int i = 0; i < 4; ++i) {
     columns.emplace(Substitute("col$0", i));
   }
-  auto s = client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
-                                                  "default.foobar", &columns);
-  ASSERT_TRUE(s.IsNotAuthorized());
-  ASSERT_EQ(4, columns.size());
+  ASSERT_OK(client_.AuthorizeActionMultipleColumns("jdoe", ActionPB::SELECT,
+                                                   "default", "foobar", &columns));
+  ASSERT_EQ(0, columns.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeActionsNoneAuthorized) {
@@ -300,9 +302,8 @@ TEST_F(RangerClientTest, TestAuthorizeActionsNoneAuthorized) {
   actions.emplace(ActionPB::DROP);
   actions.emplace(ActionPB::SELECT);
   actions.emplace(ActionPB::INSERT);
-  auto s = client_.AuthorizeActions("jdoe", "default.foobar", &actions);
-  ASSERT_TRUE(s.IsNotAuthorized());
-  ASSERT_EQ(3, actions.size());
+  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", &actions));
+  ASSERT_EQ(0, actions.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeActionsSomeAuthorized) {
@@ -311,7 +312,7 @@ TEST_F(RangerClientTest, TestAuthorizeActionsSomeAuthorized) {
   actions.emplace(ActionPB::DROP);
   actions.emplace(ActionPB::SELECT);
   actions.emplace(ActionPB::INSERT);
-  ASSERT_OK(client_.AuthorizeActions("jdoe", "default.foobar", &actions));
+  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", &actions));
   ASSERT_EQ(1, actions.size());
   ASSERT_TRUE(ContainsKey(actions, ActionPB::SELECT));
 }
@@ -324,7 +325,7 @@ TEST_F(RangerClientTest, TestAuthorizeActionsAllAuthorized) {
   actions.emplace(ActionPB::DROP);
   actions.emplace(ActionPB::SELECT);
   actions.emplace(ActionPB::INSERT);
-  ASSERT_OK(client_.AuthorizeActions("jdoe", "default.foobar", &actions));
+  ASSERT_OK(client_.AuthorizeActions("jdoe", "default", "foobar", &actions));
   ASSERT_EQ(3, actions.size());
 }
 
@@ -393,8 +394,9 @@ TEST_F(RangerClientTestBase, TestLogging) {
   }
   // Make a request. It doesn't matter whether it succeeds or not -- debug logs
   // should include info about each request.
-  Status s = client_->AuthorizeAction("user", ActionPB::ALL, "table");
-  ASSERT_TRUE(s.IsNotAuthorized());
+  bool authorized;
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", &authorized));
+  ASSERT_FALSE(authorized);
   {
     // Check that the Ranger client logs some DEBUG messages.
     vector<string> lines;
@@ -412,8 +414,8 @@ TEST_F(RangerClientTestBase, TestLogging) {
   FLAGS_ranger_overwrite_log_config = false;
   client_.reset(new RangerClient(env_, metric_entity_));
   ASSERT_OK(client_->Start());
-  s = client_->AuthorizeAction("user", ActionPB::ALL, "table");
-  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", &authorized));
+  ASSERT_FALSE(authorized);
   {
     // Our logs should still contain DEBUG messages since we didn't update the
     // logging configuration.
@@ -428,8 +430,8 @@ TEST_F(RangerClientTestBase, TestLogging) {
   FLAGS_ranger_overwrite_log_config = true;
   client_.reset(new RangerClient(env_, metric_entity_));
   ASSERT_OK(client_->Start());
-  s = client_->AuthorizeAction("user", ActionPB::ALL, "table");
-  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_OK(client_->AuthorizeAction("user", ActionPB::ALL, "db", "table", &authorized));
+  ASSERT_FALSE(authorized);
   {
     // We shouldn't see any DEBUG messages since the client is configured to
     // use INFO-level logging.
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 00352e2..3e6dee1 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -18,7 +18,6 @@
 #include "kudu/ranger/ranger_client.h"
 
 #include <algorithm>
-#include <cstdint>
 #include <cstdlib>
 #include <memory>
 #include <ostream>
@@ -181,7 +180,6 @@ using strings::Substitute;
 
 namespace {
 
-const char* kUnauthorizedAction = "Unauthorized action";
 const char* kDenyNonRangerTableTemplate = "Denying action on table with invalid name $0. "
                                           "Use 'kudu table rename_table' to rename it to "
                                           "a Ranger-compatible name.";
@@ -189,15 +187,6 @@ const char* kMainClass = "org.apache.kudu.subprocess.ranger.RangerSubprocessMain
 const char* kRangerClientLogFilename = "kudu-ranger-subprocess";
 const char* kRangerClientPropertiesFilename = "kudu-ranger-subprocess-log4j2.properties";
 
-const char* ScopeToString(RangerClient::Scope scope) {
-  switch (scope) {
-    case RangerClient::Scope::DATABASE: return "database";
-    case RangerClient::Scope::TABLE: return "table";
-  }
-  LOG(FATAL) << static_cast<uint16_t>(scope) << ": unknown scope";
-  __builtin_unreachable();
-}
-
 // Returns the path to the JAR file containing the Ranger subprocess.
 string RangerJarPath() {
   if (FLAGS_ranger_jar_path.empty()) {
@@ -406,20 +395,10 @@ Status RangerClient::Start() {
 }
 
 // TODO(abukor): refactor to avoid code duplication
-Status RangerClient::AuthorizeAction(const string& user_name,
-                                     const ActionPB& action,
-                                     const string& table_name,
+Status RangerClient::AuthorizeAction(const string& user_name, const ActionPB& action,
+                                     const string& database, const string& table, bool* authorized,
                                      Scope scope) {
   DCHECK(subprocess_);
-  string db;
-  Slice tbl;
-
-  auto s = ParseRangerTableIdentifier(table_name, &db, &tbl);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
-    return Status::NotAuthorized(kUnauthorizedAction);
-  }
-
   RangerRequestListPB req_list;
   RangerResponseListPB resp_list;
   req_list.set_user(user_name);
@@ -427,40 +406,25 @@ Status RangerClient::AuthorizeAction(const string& user_name,
   RangerRequestPB* req = req_list.add_requests();
 
   req->set_action(action);
-  req->set_database(db);
+  req->set_database(database);
   // Only pass the table name if this is table level request.
   if (scope == Scope::TABLE) {
-    req->set_table(tbl.ToString());
+    req->set_table(table);
   }
 
   RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
 
   CHECK_EQ(1, resp_list.responses_size());
-  if (resp_list.responses().begin()->allowed()) {
-    return Status::OK();
-  }
-
-  LOG(WARNING) << Substitute("User $0 is not authorized to perform $1 on $2 at scope ($3)",
-                             user_name, ActionPB_Name(action), table_name, ScopeToString(scope));
-  return Status::NotAuthorized(kUnauthorizedAction);
+  *authorized = resp_list.responses().begin()->allowed();
+  return Status::OK();
 }
 
-Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
-                                                    const ActionPB& action,
-                                                    const string& table_name,
+Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name, const ActionPB& action,
+                                                    const string& database, const string& table,
                                                     unordered_set<string>* column_names) {
   DCHECK(subprocess_);
   DCHECK(!column_names->empty());
 
-  string db;
-  Slice tbl;
-
-  auto s = ParseRangerTableIdentifier(table_name, &db, &tbl);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
-    return Status::NotAuthorized(kUnauthorizedAction);
-  }
-
   RangerRequestListPB req_list;
   RangerResponseListPB resp_list;
   req_list.set_user(user_name);
@@ -468,8 +432,8 @@ Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
   for (const auto& col : *column_names) {
     auto req = req_list.add_requests();
     req->set_action(action);
-    req->set_database(db);
-    req->set_table(tbl.ToString());
+    req->set_database(database);
+    req->set_table(table);
     req->set_column(col);
   }
 
@@ -484,20 +448,13 @@ Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
     }
   }
 
-  if (allowed_columns.empty()) {
-    LOG(WARNING) << Substitute("User $0 is not authorized to perform $1 on table $2",
-                               user_name, ActionPB_Name(action), table_name);
-    return Status::NotAuthorized(kUnauthorizedAction);
-  }
-
   *column_names = move(allowed_columns);
 
   return Status::OK();
 }
 
-Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
-                                                   const ActionPB& action,
-                                                   unordered_set<string>* table_names) {
+Status RangerClient::AuthorizeActionMultipleTables(const string& user_name, const ActionPB& action,
+                                                   unordered_set<string>* tables) {
   DCHECK(subprocess_);
 
   RangerRequestListPB req_list;
@@ -506,7 +463,7 @@ Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
 
   vector<string> orig_table_names;
 
-  for (const auto& table : *table_names) {
+  for (const auto& table : *tables) {
     string db;
     Slice tbl;
 
@@ -534,26 +491,17 @@ Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
     }
   }
 
-  *table_names = move(allowed_tables);
+  *tables = move(allowed_tables);
 
   return Status::OK();
 }
 
-Status RangerClient::AuthorizeActions(const string& user_name,
-                                      const string& table_name,
+Status RangerClient::AuthorizeActions(const string& user_name, const string& database,
+                                      const string& table,
                                       unordered_set<ActionPB, ActionHash>* actions) {
   DCHECK(subprocess_);
   DCHECK(!actions->empty());
 
-  string db;
-  Slice tbl;
-
-  auto s = ParseRangerTableIdentifier(table_name, &db, &tbl);
-  if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << Substitute(kDenyNonRangerTableTemplate, table_name);
-    return Status::NotAuthorized(kUnauthorizedAction);
-  }
-
   RangerRequestListPB req_list;
   RangerResponseListPB resp_list;
   req_list.set_user(user_name);
@@ -561,8 +509,8 @@ Status RangerClient::AuthorizeActions(const string& user_name,
   for (const auto& action : *actions) {
     auto req = req_list.add_requests();
     req->set_action(action);
-    req->set_database(db);
-    req->set_table(tbl.ToString());
+    req->set_database(database);
+    req->set_table(table);
   }
 
   RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
@@ -576,12 +524,6 @@ Status RangerClient::AuthorizeActions(const string& user_name,
     }
   }
 
-  if (allowed_actions.empty()) {
-    LOG(WARNING) << Substitute("User $0 is not authorized to perform actions $1 on table $2",
-                               user_name, JoinMapped(*actions, ActionPB_Name, ", "), table_name);
-    return Status::NotAuthorized(kUnauthorizedAction);
-  }
-
   *actions = move(allowed_actions);
 
   return Status::OK();
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
index 79ab45a..8e119f9 100644
--- a/src/kudu/ranger/ranger_client.h
+++ b/src/kudu/ranger/ranger_client.h
@@ -57,6 +57,11 @@ typedef subprocess::SubprocessProxy<RangerRequestListPB, RangerResponseListPB,
 // coming from this class are environmental, like Java binary location, location
 // of config files, and krb5.conf file (setting it doesn't enable Kerberos, that
 // depends on core-site.xml).
+//
+// These methods return non-OK status only if something goes wrong between the
+// client and the subprocess (e.g. IOError, EndOfFile, Corruption), but they
+// never return NotAuthorized. The authz provider is responsible to return
+// NotAuthorized based on RangerClient's out parameters and return Status.
 class RangerClient {
  public:
   // Similar to SentryAuthorizableScope scope which indicates the
@@ -77,35 +82,30 @@ class RangerClient {
   // Starts the RangerClient, initializes the subprocess server.
   Status Start() WARN_UNUSED_RESULT;
 
-  // Authorizes an action on the table. Returns OK if 'user_name' is authorized
-  // to perform 'action' on 'table_name', NotAuthorized otherwise.
+  // Authorizes an action on the table. Sets 'authorized' to true if it's
+  // authorized, false otherwise.
   Status AuthorizeAction(const std::string& user_name, const ActionPB& action,
-                         const std::string& table_name, Scope scope = Scope::TABLE)
-      WARN_UNUSED_RESULT;
+                         const std::string& database, const std::string& table, bool* authorized,
+                         Scope scope = Scope::TABLE) WARN_UNUSED_RESULT;
 
   // Authorizes action on multiple tables. It sets 'table_names' to the
-  // tables the user is authorized to access and returns OK.
+  // tables the user is authorized to access.
   Status AuthorizeActionMultipleTables(const std::string& user_name, const ActionPB& action,
-                                       std::unordered_set<std::string>* table_names)
-      WARN_UNUSED_RESULT;
+                                       std::unordered_set<std::string>* tables)
+    WARN_UNUSED_RESULT;
 
-  // Authorizes action on multiple columns. If there is at least one column that
-  // user is authorized to perform the action on, it sets 'column_names' to the
-  // columns the user is authorized to access and returns OK, NotAuthorized
-  // otherwise.
+  // Authorizes action on multiple columns. It sets 'column_names' to the
+  // columns the user is authorized to access.
   Status AuthorizeActionMultipleColumns(const std::string& user_name, const ActionPB& action,
-                                        const std::string& table_name,
+                                        const std::string& database, const std::string& table,
                                         std::unordered_set<std::string>* column_names)
       WARN_UNUSED_RESULT;
 
-  // Authorizes multiple table-level actions on a single table. If there is at
-  // least one action that user is authorized to perform on the table, it sets
-  // 'actions' to the actions the user is authorized to perform and returns OK,
-  // NotAuthorized otherwise.
-  Status AuthorizeActions(const std::string& user_name,
-                          const std::string& table_name,
-                          std::unordered_set<ActionPB, ActionHash>* actions)
-      WARN_UNUSED_RESULT;
+  // Authorizes multiple table-level actions on a single table. It sets
+  // 'actions' to the actions the user is authorized to perform.
+  Status AuthorizeActions(const std::string& user_name, const std::string& database,
+                          const std::string& table,
+                          std::unordered_set<ActionPB, ActionHash>* actions) WARN_UNUSED_RESULT;
 
   // Replaces the subprocess server in the subprocess proxy.
   void ReplaceServerForTests(std::unique_ptr<subprocess::SubprocessServer> server) {