You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/04/11 01:19:11 UTC

[kudu] 03/04: [master] introduce SentryPrivilegesFetcher

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 511d5bb536b90d1fedfec49fd48b786e5b8e9119
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Wed Mar 20 15:30:14 2019 -0700

    [master] introduce SentryPrivilegesFetcher
    
    This patch incorporates a TTL-based cache into the data paths
    of SentryAuthzProvider by introducing a new entity responsible for
    fetching and caching the information received from Sentry authz
    authority: SentryPrivilegesFetcher.  A cache's entry contains sanitized
    and transformed information received as TListSentryPrivilegesResponse
    from Sentry.
    
    Set the newly introduced `--sentry_authz_cache_capacity_mb`
    command-line flag to 0 to disable caching of authz privilege information
    returned from Sentry.
    
    Change-Id: Idaefacd50736f1f152dae34e76778e17b2e84cbe
    Reviewed-on: http://gerrit.cloudera.org:8080/12833
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/integration-tests/master_sentry-itest.cc  |  32 +-
 src/kudu/master/CMakeLists.txt                     |   2 +
 src/kudu/master/default_authz_provider.h           |   3 +-
 src/kudu/master/sentry_authz_provider-test.cc      | 282 +++++----
 src/kudu/master/sentry_authz_provider.cc           | 450 ++------------
 src/kudu/master/sentry_authz_provider.h            | 120 +---
 src/kudu/master/sentry_privileges_cache_metrics.cc |  77 +++
 src/kudu/master/sentry_privileges_cache_metrics.h  |  35 ++
 ...hz_provider.cc => sentry_privileges_fetcher.cc} | 689 ++++++++++-----------
 ...uthz_provider.h => sentry_privileges_fetcher.h} | 183 +++---
 src/kudu/sentry/sentry_authorizable_scope.cc       |   5 +-
 src/kudu/sentry/sentry_authorizable_scope.h        |   4 +-
 12 files changed, 800 insertions(+), 1082 deletions(-)

diff --git a/src/kudu/integration-tests/master_sentry-itest.cc b/src/kudu/integration-tests/master_sentry-itest.cc
index 74b853f..b0f711d 100644
--- a/src/kudu/integration-tests/master_sentry-itest.cc
+++ b/src/kudu/integration-tests/master_sentry-itest.cc
@@ -269,6 +269,12 @@ class SentryITestBase : public HmsITestBase {
     // authorized.
     opts.extra_master_flags.emplace_back("--trusted_user_acl=impala");
     opts.extra_master_flags.emplace_back("--user_acl=test-user,impala");
+
+    // Enable/disable caching of authz privileges received from Sentry.
+    opts.extra_master_flags.emplace_back(
+        Substitute("--sentry_privileges_cache_capacity_mb=$0",
+                   IsAuthzPrivilegeCacheEnabled() ? 1 : 0));
+
     StartClusterWithOpts(std::move(opts));
 
     // Create principals 'impala' and 'kudu'. Configure to use the latter.
@@ -326,6 +332,10 @@ class SentryITestBase : public HmsITestBase {
     HmsITestBase::TearDown();
   }
 
+  virtual bool IsAuthzPrivilegeCacheEnabled() const {
+    return false;
+  }
+
  protected:
   unique_ptr<SentryClient> sentry_client_;
 };
@@ -467,11 +477,14 @@ TEST_F(MasterSentryTest, DISABLED_TestRenameTablePrivilegeTransfer) {
   NO_FATALS(CheckTable(kDatabaseName, "c", make_optional<const string&>(kAdminUser)));
 }
 
-class TestAuthzTable : public MasterSentryTest,
-                       public ::testing::WithParamInterface<AuthzDescriptor> {};
+class TestAuthzTable :
+    public MasterSentryTest,
+    public ::testing::WithParamInterface<AuthzDescriptor> {
+    // TODO(aserbin): update the test to introduce authz privilege caching
+};
 
 TEST_P(TestAuthzTable, TestAuthorizeTable) {
-  AuthzDescriptor desc = GetParam();
+  const AuthzDescriptor& desc = GetParam();
   const auto table_name = Substitute("$0.$1", desc.database, desc.table_name);
   const auto new_table_name = Substitute("$0.$1", desc.database, desc.new_table_name);
 
@@ -582,7 +595,6 @@ static const AuthzDescriptor kAuthzCombinations[] = {
       ""
     },
 };
-
 INSTANTIATE_TEST_CASE_P(AuthzCombinations,
                         TestAuthzTable,
                         ::testing::ValuesIn(kAuthzCombinations));
@@ -621,10 +633,13 @@ TEST_F(MasterSentryTest, TestMismatchedTable) {
   ASSERT_STR_CONTAINS(s.ToString(), "the table ID refers to a different table");
 }
 
-class AuthzErrorHandlingTest : public SentryITestBase,
-                               public ::testing::WithParamInterface<AuthzFuncs> {};
+class AuthzErrorHandlingTest :
+    public SentryITestBase,
+    public ::testing::WithParamInterface<AuthzFuncs> {
+    // TODO(aserbin): update the test to introduce authz privilege caching
+};
 TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {
-  AuthzFuncs funcs = GetParam();
+  const AuthzFuncs& funcs = GetParam();
   const auto table_name = Substitute("$0.$1", kDatabaseName, "non_existent");
   const auto new_table_name = Substitute("$0.$1", kDatabaseName, "b");
 
@@ -635,7 +650,8 @@ TEST_P(AuthzErrorHandlingTest, TestNonExistentTable) {
   ASSERT_TRUE(s.IsNotAuthorized());
   ASSERT_STR_CONTAINS(s.ToString(), "unauthorized action");
 
-  // Ensure that operating on a non-existent table while the Sentry is unreachable fails.
+  // Ensure that operating on a non-existent table fails
+  // while Sentry is unreachable.
   ASSERT_OK(StopSentry());
   s = funcs.do_action(this, table_name, new_table_name);
   ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 73132b5..0b8c40e 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -46,6 +46,8 @@ set(MASTER_SRCS
   placement_policy.cc
   sentry_authz_provider.cc
   sentry_client_metrics.cc
+  sentry_privileges_cache_metrics.cc
+  sentry_privileges_fetcher.cc
   sys_catalog.cc
   ts_descriptor.cc
   ts_manager.cc)
diff --git a/src/kudu/master/default_authz_provider.h b/src/kudu/master/default_authz_provider.h
index 516dab1..47509b0 100644
--- a/src/kudu/master/default_authz_provider.h
+++ b/src/kudu/master/default_authz_provider.h
@@ -34,10 +34,9 @@ namespace master {
 // Default AuthzProvider which always authorizes any operations.
 class DefaultAuthzProvider : public AuthzProvider {
  public:
-
   Status Start() override WARN_UNUSED_RESULT { return Status::OK(); }
 
-  void Stop() override {};
+  void Stop() override {}
 
   Status AuthorizeCreateTable(const std::string& /*table_name*/,
                               const std::string& /*user*/,
diff --git a/src/kudu/master/sentry_authz_provider-test.cc b/src/kudu/master/sentry_authz_provider-test.cc
index 853998d..f83534f 100644
--- a/src/kudu/master/sentry_authz_provider-test.cc
+++ b/src/kudu/master/sentry_authz_provider-test.cc
@@ -39,6 +39,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/sentry_authz_provider-test-base.h"
+#include "kudu/master/sentry_privileges_fetcher.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/sentry/mini_sentry.h"
 #include "kudu/sentry/sentry-test-base.h"
@@ -58,6 +59,7 @@
 
 DECLARE_int32(sentry_service_recv_timeout_seconds);
 DECLARE_int32(sentry_service_send_timeout_seconds);
+DECLARE_uint32(sentry_privileges_cache_capacity_mb);
 DECLARE_string(sentry_service_rpc_addresses);
 DECLARE_string(server_name);
 DECLARE_string(trusted_user_acl);
@@ -101,7 +103,7 @@ TEST(SentryAuthzProviderStaticTest, TestTrustedUserAcl) {
 }
 
 // Basic unit test for validations on ill-formed privileges.
-TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed) {
+TEST(SentryPrivilegesFetcherStaticTest, TestPrivilegesWellFormed) {
   const string kDb = "db";
   const string kTable = "table";
   TSentryAuthorizable requested_authorizable;
@@ -113,21 +115,21 @@ TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed) {
     // Privilege with a bogus action set.
     TSentryPrivilege privilege = real_privilege;
     privilege.__set_action("NotAnAction");
-    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_FALSE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
   }
   {
     // Privilege with a bogus authorizable scope set.
     TSentryPrivilege privilege = real_privilege;
     privilege.__set_privilegeScope("NotAnAuthorizableScope");
-    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_FALSE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
   }
   {
     // Privilege with a valid, but unexpected scope for the set fields.
     TSentryPrivilege privilege = real_privilege;
     privilege.__set_privilegeScope("COLUMN");
-    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_FALSE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
   }
   {
@@ -135,21 +137,21 @@ TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed) {
     // requested.
     TSentryPrivilege privilege = real_privilege;
     privilege.__set_dbName("NotTheActualDb");
-    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_FALSE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
   }
   {
     // Privilege with an field set that isn't meant to be set at its scope.
     TSentryPrivilege privilege = real_privilege;
     privilege.__set_columnName("SomeColumn");
-    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_FALSE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
   }
   {
     // Privilege with a missing field for its scope.
     TSentryPrivilege privilege = real_privilege;
     privilege.__isset.tableName = false;
-    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_FALSE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
   }
   {
@@ -157,7 +159,7 @@ TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed) {
     SentryAuthorizableScope::Scope granted_scope;
     SentryAction::Action granted_action;
     real_privilege.printTo(LOG(INFO));
-    ASSERT_TRUE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    ASSERT_TRUE(SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
         real_privilege, requested_authorizable, &granted_scope, &granted_action));
     ASSERT_EQ(SentryAuthorizableScope::TABLE, granted_scope);
     ASSERT_EQ(SentryAction::ALL, granted_action);
@@ -166,9 +168,9 @@ TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed) {
 
 class SentryAuthzProviderTest : public SentryTestBase {
  public:
-  const char* const kTestUser = "test-user";
-  const char* const kUserGroup = "user";
-  const char* const kRoleName = "developer";
+  static const char* const kTestUser;
+  static const char* const kUserGroup;
+  static const char* const kRoleName;
 
   void SetUp() override {
     SentryTestBase::SetUp();
@@ -177,11 +179,22 @@ class SentryAuthzProviderTest : public SentryTestBase {
         &metric_registry_, "sentry_auth_provider-test");
 
     // Configure the SentryAuthzProvider flags.
+    FLAGS_sentry_privileges_cache_capacity_mb = CachingEnabled() ? 1 : 0;
     FLAGS_sentry_service_rpc_addresses = sentry_->address().ToString();
     sentry_authz_provider_.reset(new SentryAuthzProvider(metric_entity_));
     ASSERT_OK(sentry_authz_provider_->Start());
   }
 
+  bool KerberosEnabled() const override {
+    // The returned value corresponds to the actual setting of the
+    // --sentry_service_security_mode flag; now it's "kerberos" by default.
+    return true;
+  }
+
+  virtual bool CachingEnabled() const {
+    return true;
+  }
+
   Status StopSentry() {
     RETURN_NOT_OK(sentry_client_->Stop());
     RETURN_NOT_OK(sentry_->Stop());
@@ -194,8 +207,21 @@ class SentryAuthzProviderTest : public SentryTestBase {
     return Status::OK();
   }
 
-  bool KerberosEnabled() const override {
-    return true;
+  Status DropRole() {
+    RETURN_NOT_OK(kudu::master::DropRole(sentry_client_.get(), kRoleName));
+    return sentry_authz_provider_->fetcher_.ResetCache();
+  }
+
+  Status CreateRoleAndAddToGroups() {
+    RETURN_NOT_OK(kudu::master::CreateRoleAndAddToGroups(
+        sentry_client_.get(), kRoleName, kUserGroup));
+    return sentry_authz_provider_->fetcher_.ResetCache();
+  }
+
+  Status AlterRoleGrantPrivilege(const TSentryPrivilege& privilege) {
+    RETURN_NOT_OK(kudu::master::AlterRoleGrantPrivilege(
+        sentry_client_.get(), kRoleName, privilege));
+    return sentry_authz_provider_->fetcher_.ResetCache();
   }
 
 #define GET_GAUGE_READINGS(func_name, counter_name_suffix) \
@@ -218,6 +244,10 @@ class SentryAuthzProviderTest : public SentryTestBase {
   unique_ptr<SentryAuthzProvider> sentry_authz_provider_;
 };
 
+const char* const SentryAuthzProviderTest::kTestUser = "test-user";
+const char* const SentryAuthzProviderTest::kUserGroup = "user";
+const char* const SentryAuthzProviderTest::kRoleName = "developer";
+
 namespace {
 
 const SentryActionsSet kAllActions({
@@ -273,11 +303,12 @@ constexpr const char* kColumn = "column";
 class SentryAuthzProviderFilterPrivilegesTest : public SentryAuthzProviderTest {
  public:
   SentryAuthzProviderFilterPrivilegesTest()
-      : prng_(SeedRandom()) {}
+      : prng_(SeedRandom()) {
+  }
 
   void SetUp() override {
     SentryAuthzProviderTest::SetUp();
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+    ASSERT_OK(CreateRoleAndAddToGroups());
     full_authorizable_.server = FLAGS_server_name;
     full_authorizable_.db = kDb;
     full_authorizable_.table = kTable;
@@ -302,7 +333,7 @@ class SentryAuthzProviderFilterPrivilegesTest : public SentryAuthzProviderTest {
 
     // Select a scope at which we'll mess up the privilege request's field.
     AuthorizableScopesSet nonempty_fields =
-        SentryAuthzProvider::ExpectedNonEmptyFields(scope.scope());
+        SentryPrivilegesFetcher::ExpectedNonEmptyFields(scope.scope());
     if (invalid_privilege == InvalidPrivilege::FLIPPED_FIELD) {
       static const AuthorizableScopesSet kMessUpCandidates = {
         SentryAuthorizableScope::SERVER,
@@ -397,7 +428,7 @@ TEST_F(SentryAuthzProviderFilterPrivilegesTest, TestTablePrivilegePBParsing) {
       // Grant the privilege to the user.
       TSentryPrivilege table_privilege = CreatePrivilege(table_authorizable,
           SentryAuthorizableScope(granted_scope), SentryAction(action));
-      ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, table_privilege));
+      ASSERT_OK(AlterRoleGrantPrivilege(table_privilege));
 
       // All of the privileges imply the table-level action.
       InsertIfNotPresent(&table_privileges, action);
@@ -414,7 +445,7 @@ TEST_F(SentryAuthzProviderFilterPrivilegesTest, TestTablePrivilegePBParsing) {
       // Grant the privilege to the user.
       TSentryPrivilege column_privilege =
           GetColumnPrivilege(kDb, kTable, column_name, ActionToString(action));
-      ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, column_privilege));
+      ASSERT_OK(AlterRoleGrantPrivilege(column_privilege));
 
       if (SentryAction(action).Implies(SentryAction(SentryAction::SELECT))) {
         InsertIfNotPresent(&scannable_columns, FindOrDie(col_name_to_id, column_name));
@@ -481,17 +512,17 @@ TEST_P(SentryAuthzProviderFilterPrivilegesScopeTest, TestFilterInvalidResponses)
     for (const auto& ip : kInvalidPrivileges) {
       TSentryPrivilege privilege = CreatePrivilege(full_authorizable_, granted_scope,
                                                    SentryAction(action), ip);
-      ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+      ASSERT_OK(AlterRoleGrantPrivilege(privilege));
     }
   }
   for (const auto& requested_scope : { SentryAuthorizableScope::SERVER,
                                        SentryAuthorizableScope::DATABASE,
                                        SentryAuthorizableScope::TABLE }) {
-    SentryPrivilegesBranch privileges;
-    ASSERT_OK(sentry_authz_provider_->GetSentryPrivileges(
-        requested_scope, table_ident, kTestUser, &privileges));
+    SentryPrivilegesBranch privileges_info;
+    ASSERT_OK(sentry_authz_provider_->fetcher_.GetSentryPrivileges(
+        requested_scope, table_ident, kTestUser, &privileges_info));
     // Kudu should ignore all of the invalid privileges.
-    ASSERT_TRUE(privileges.privileges.empty());
+    ASSERT_TRUE(privileges_info.privileges().empty());
   }
 }
 
@@ -504,19 +535,19 @@ TEST_P(SentryAuthzProviderFilterPrivilegesScopeTest, TestFilterValidResponses) {
   for (const auto& action : kAllActions) {
     TSentryPrivilege privilege = CreatePrivilege(full_authorizable_, granted_scope,
                                                  SentryAction(action));
-    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+    ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   }
   for (const auto& requested_scope : { SentryAuthorizableScope::SERVER,
                                        SentryAuthorizableScope::DATABASE,
                                        SentryAuthorizableScope::TABLE }) {
-    SentryPrivilegesBranch privileges;
-    ASSERT_OK(sentry_authz_provider_->GetSentryPrivileges(
-        requested_scope, table_ident, kTestUser, &privileges));
-    ASSERT_EQ(1, privileges.privileges.size());
-    const auto& authorizable_privileges = privileges.privileges[0];
+    SentryPrivilegesBranch privileges_info;
+    ASSERT_OK(sentry_authz_provider_->fetcher_.GetSentryPrivileges(
+        requested_scope, table_ident, kTestUser, &privileges_info));
+    ASSERT_EQ(1, privileges_info.privileges().size());
+    const auto& authorizable_privileges = *privileges_info.privileges().cbegin();
     ASSERT_EQ(GetParam(), authorizable_privileges.scope)
         << ScopeToString(authorizable_privileges.scope);
-    ASSERT_FALSE(authorizable_privileges.granted_privileges.empty());
+    ASSERT_FALSE(authorizable_privileges.allowed_actions.empty());
   }
 }
 
@@ -526,13 +557,13 @@ INSTANTIATE_TEST_CASE_P(GrantedScopes, SentryAuthzProviderFilterPrivilegesScopeT
                                           SentryAuthorizableScope::TABLE,
                                           SentryAuthorizableScope::COLUMN));
 
-
-
 // Test to create tables requiring ALL ON DATABASE with the grant option. This
 // is parameterized on the ALL scope and OWNER actions, which behave
 // identically.
-class CreateTableAuthorizationTest : public SentryAuthzProviderTest,
-                                     public ::testing::WithParamInterface<string> {};
+class CreateTableAuthorizationTest :
+    public SentryAuthzProviderTest,
+    public ::testing::WithParamInterface<string> {
+};
 
 TEST_P(CreateTableAuthorizationTest, TestAuthorizeCreateTable) {
   // Don't authorize create table on a non-existent user.
@@ -546,15 +577,15 @@ TEST_P(CreateTableAuthorizationTest, TestAuthorizeCreateTable) {
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Don't authorize create table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups());
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "DROP");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize create table on a user with proper privileges.
   privilege = GetDatabasePrivilege("db", "CREATE");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, kTestUser));
 
   // Table creation with a different owner than the user
@@ -567,7 +598,7 @@ TEST_P(CreateTableAuthorizationTest, TestAuthorizeCreateTable) {
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user");
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
   privilege = GetDatabasePrivilege("db", all, TSentryGrantOption::ENABLED);
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user"));
 }
 
@@ -576,29 +607,29 @@ INSTANTIATE_TEST_CASE_P(AllOrOwner, CreateTableAuthorizationTest,
 
 TEST_F(SentryAuthzProviderTest, TestAuthorizeDropTable) {
   // Don't authorize delete table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups());
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "SELECT");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   Status s = sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize delete table on a user with proper privileges.
   privilege = GetDatabasePrivilege("db", "DROP");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser));
 }
 
 TEST_F(SentryAuthzProviderTest, TestAuthorizeAlterTable) {
   // Don't authorize alter table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups());
   TSentryPrivilege db_privilege = GetDatabasePrivilege("db", "SELECT");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, db_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(db_privilege));
   Status s = sentry_authz_provider_->AuthorizeAlterTable("db.table", "db.table", kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize alter table without rename on a user with proper privileges.
   db_privilege = GetDatabasePrivilege("db", "ALTER");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, db_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(db_privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeAlterTable("db.table", "db.table", kTestUser));
 
   // Table alteration with rename requires 'ALL ON TABLE <old-table>' and
@@ -608,9 +639,9 @@ TEST_F(SentryAuthzProviderTest, TestAuthorizeAlterTable) {
 
   // Authorize alter table without rename on a user with proper privileges.
   db_privilege = GetDatabasePrivilege("new_db", "CREATE");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, db_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(db_privilege));
   TSentryPrivilege table_privilege = GetTablePrivilege("db", "table", "ALL");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, table_privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(table_privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeAlterTable("db.table",
                                                         "new_db.new_table",
                                                         kTestUser));
@@ -618,38 +649,90 @@ TEST_F(SentryAuthzProviderTest, TestAuthorizeAlterTable) {
 
 TEST_F(SentryAuthzProviderTest, TestAuthorizeGetTableMetadata) {
   // Don't authorize delete table on a user without required privileges.
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups());
   Status s = sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser);
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
 
   // Authorize delete table on a user with proper privileges.
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "SELECT");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser));
 }
 
-// Checks that the SentryAuthzProvider handles reconnecting to Sentry after a connection failure,
-// or service being too busy.
-TEST_F(SentryAuthzProviderTest, TestReconnect) {
+TEST_F(SentryAuthzProviderTest, TestInvalidAction) {
+  ASSERT_OK(CreateRoleAndAddToGroups());
+  TSentryPrivilege privilege = GetDatabasePrivilege("db", "invalid");
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
+  // User has privileges with invalid action cannot operate on the table.
+  Status s = sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser);
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+}
+
+TEST_F(SentryAuthzProviderTest, TestInvalidAuthzScope) {
+  ASSERT_OK(CreateRoleAndAddToGroups());
+  TSentryPrivilege privilege = GetDatabasePrivilege("db", "ALL");
+  privilege.__set_privilegeScope("invalid");
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
+  // User has privileges with invalid authorizable scope cannot operate
+  // on the table.
+  Status s = sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser);
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+}
 
-  // Restart SentryAuthzProvider with configured timeout to reduce the run time of this test.
+// Ensures Sentry privileges are case insensitive.
+TEST_F(SentryAuthzProviderTest, TestPrivilegeCaseSensitivity) {
+  ASSERT_OK(CreateRoleAndAddToGroups());
+  TSentryPrivilege privilege = GetDatabasePrivilege("db", "create");
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
+  ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser));
+}
+
+// Whether the authz information received from Sentry is cached or not.
+enum class AuthzCaching {
+  Disabled,
+  Enabled,
+};
+
+// Tests to ensure SentryAuthzProvider enforces access restrictions as expected.
+// Parameterized by whether caching is enabled.
+class SentryAuthzProviderReconnectionTest :
+    public SentryAuthzProviderTest,
+    public ::testing::WithParamInterface<AuthzCaching> {
+ public:
+  bool CachingEnabled() const override {
+    return GetParam() == AuthzCaching::Enabled;
+  }
+};
+INSTANTIATE_TEST_CASE_P(
+    , SentryAuthzProviderReconnectionTest,
+    ::testing::Values(AuthzCaching::Disabled, AuthzCaching::Enabled));
+
+// Checks that the SentryAuthzProvider handles reconnecting to Sentry
+// after a connection failure, or service being too busy.
+TEST_P(SentryAuthzProviderReconnectionTest, ConnectionFailureOrTooBusy) {
+  // Restart SentryAuthzProvider with configured timeout to reduce the run time
+  // of this test.
   NO_FATALS(sentry_authz_provider_->Stop());
   FLAGS_sentry_service_rpc_addresses = sentry_->address().ToString();
   FLAGS_sentry_service_send_timeout_seconds = AllowSlowTests() ? 5 : 2;
   FLAGS_sentry_service_recv_timeout_seconds = AllowSlowTests() ? 5 : 2;
-  sentry_authz_provider_.reset(new SentryAuthzProvider());
+  sentry_authz_provider_.reset(new SentryAuthzProvider);
   ASSERT_OK(sentry_authz_provider_->Start());
 
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups());
   TSentryPrivilege privilege = GetDatabasePrivilege("db", "METADATA");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser));
 
   // Shutdown Sentry and try a few operations.
   ASSERT_OK(StopSentry());
 
   Status s = sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser);
-  EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
+  if (CachingEnabled()) {
+    EXPECT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  } else {
+    EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
+  }
 
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user");
   EXPECT_TRUE(s.IsNetworkError()) << s.ToString();
@@ -662,17 +745,25 @@ TEST_F(SentryAuthzProviderTest, TestReconnect) {
   });
 
   privilege = GetDatabasePrivilege("db", "DROP");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser));
 
   // Pause Sentry and try a few operations.
   ASSERT_OK(sentry_->Pause());
 
   s = sentry_authz_provider_->AuthorizeDropTable("db.table", kTestUser);
-  EXPECT_TRUE(s.IsTimedOut()) << s.ToString();
+  if (CachingEnabled()) {
+    EXPECT_TRUE(s.ok()) << s.ToString();
+  } else {
+    EXPECT_TRUE(s.IsTimedOut()) << s.ToString();
+  }
 
   s = sentry_authz_provider_->AuthorizeGetTableMetadata("db.table", kTestUser);
-  EXPECT_TRUE(s.IsTimedOut()) << s.ToString();
+  if (CachingEnabled()) {
+    EXPECT_TRUE(s.ok()) << s.ToString();
+  } else {
+    EXPECT_TRUE(s.IsTimedOut()) << s.ToString();
+  }
 
   // Resume Sentry and ensure that the same operations succeed.
   ASSERT_OK(sentry_->Resume());
@@ -682,41 +773,15 @@ TEST_F(SentryAuthzProviderTest, TestReconnect) {
   });
 }
 
-TEST_F(SentryAuthzProviderTest, TestInvalidAction) {
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
-  TSentryPrivilege privilege = GetDatabasePrivilege("db", "invalid");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
-  // User has privileges with invalid action cannot operate on the table.
-  Status s = sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser);
-  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-}
-
-TEST_F(SentryAuthzProviderTest, TestInvalidAuthzScope) {
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
-  TSentryPrivilege privilege = GetDatabasePrivilege("db", "ALL");
-  privilege.__set_privilegeScope("invalid");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
-  // User has privileges with invalid authorizable scope cannot operate
-  // on the table.
-  Status s = sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser);
-  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-}
-
-// Ensures Sentry privileges are case insensitive.
-TEST_F(SentryAuthzProviderTest, TestPrivilegeCaseSensitivity) {
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
-  TSentryPrivilege privilege = GetDatabasePrivilege("db", "create");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
-  ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("DB.table", kTestUser, kTestUser));
-}
-
 // Test to ensure the authorization hierarchy rule of SentryAuthzProvider
 // works as expected.
-class TestAuthzHierarchy : public SentryAuthzProviderTest,
-                           public ::testing::WithParamInterface<SentryAuthorizableScope::Scope> {};
+class TestAuthzHierarchy :
+    public SentryAuthzProviderTest,
+    public ::testing::WithParamInterface<SentryAuthorizableScope::Scope> {
+};
 
 TEST_P(TestAuthzHierarchy, TestAuthorizableScope) {
-  SentryAuthorizableScope::Scope scope = GetParam();
+  const SentryAuthorizableScope::Scope scope = GetParam();
   const string action = "ALL";
   const string db = "database";
   const string tbl = "table";
@@ -759,35 +824,44 @@ TEST_P(TestAuthzHierarchy, TestAuthorizableScope) {
   // Privilege with higher scope on the hierarchy can imply privileges
   // with lower scope on the hierarchy.
   for (const auto& privilege : higher_hierarchy_privs) {
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
-    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+    ASSERT_OK(CreateRoleAndAddToGroups());
+    ASSERT_OK(AlterRoleGrantPrivilege(privilege));
     ASSERT_OK(sentry_authz_provider_->Authorize(scope, SentryAction::Action::ALL,
                                                 Substitute("$0.$1", db, tbl), kTestUser));
-    ASSERT_OK(DropRole(sentry_client_.get(), kRoleName));
+    ASSERT_OK(DropRole());
   }
 
   // Privilege with lower scope on the hierarchy cannot imply privileges
   // with higher scope on the hierarchy.
   for (const auto& privilege : lower_hierarchy_privs) {
-    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
-    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+    ASSERT_OK(CreateRoleAndAddToGroups());
+    ASSERT_OK(AlterRoleGrantPrivilege(privilege));
     Status s = sentry_authz_provider_->Authorize(scope, SentryAction::Action::ALL,
                                                  Substitute("$0.$1", db, tbl), kTestUser);
     ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-    ASSERT_OK(DropRole(sentry_client_.get(), kRoleName));
+    ASSERT_OK(DropRole());
   }
 }
 
 INSTANTIATE_TEST_CASE_P(AuthzCombinations, TestAuthzHierarchy,
-                       // Scope::COLUMN is excluded since column scope for table
-                       // authorizable doesn't make sense.
-                       ::testing::Values(SentryAuthorizableScope::Scope::SERVER,
-                                         SentryAuthorizableScope::Scope::DATABASE,
-                                         SentryAuthorizableScope::Scope::TABLE));
+    // Scope::COLUMN is excluded since column scope for table
+    // authorizable doesn't make sense.
+    ::testing::Values(SentryAuthorizableScope::Scope::SERVER,
+                      SentryAuthorizableScope::Scope::DATABASE,
+                      SentryAuthorizableScope::Scope::TABLE));
 
 // Test to verify the functionality of metrics in HA Sentry client used in
 // SentryAuthzProvider to communicate with Sentry.
-TEST_F(SentryAuthzProviderTest, TestSentryClientMetrics) {
+class TestSentryClientMetrics : public SentryAuthzProviderTest {
+ public:
+  bool CachingEnabled() const override {
+    // For simplicity, scenarios of this test doesn't use caching. The scenarios
+    // track updates of HaClient metrics upon issuing RPCs to Sentry.
+    return false;
+  }
+};
+
+TEST_F(TestSentryClientMetrics, Basic) {
   ASSERT_EQ(0, GetTasksSuccessful());
   ASSERT_EQ(0, GetTasksFailedFatal());
   ASSERT_EQ(0, GetTasksFailedNonFatal());
@@ -845,9 +919,9 @@ TEST_F(SentryAuthzProviderTest, TestSentryClientMetrics) {
   sentry_authz_provider_.reset(new SentryAuthzProvider(metric_entity_));
   ASSERT_OK(sentry_authz_provider_->Start());
 
-  ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+  ASSERT_OK(CreateRoleAndAddToGroups());
   const auto privilege = GetDatabasePrivilege("db", "create");
-  ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
 
   // Pause Sentry and try to send an RPC, expecting it to time out.
   ASSERT_OK(sentry_->Pause());
diff --git a/src/kudu/master/sentry_authz_provider.cc b/src/kudu/master/sentry_authz_provider.cc
index dfb6895..12f1cc1 100644
--- a/src/kudu/master/sentry_authz_provider.cc
+++ b/src/kudu/master/sentry_authz_provider.cc
@@ -17,141 +17,42 @@
 
 #include "kudu/master/sentry_authz_provider.h"
 
-#include <memory>
-#include <ostream>
-#include <type_traits>
-#include <unordered_map>
 #include <unordered_set>
 #include <utility>
-#include <vector>
 
-#include <boost/algorithm/string/predicate.hpp>
-#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/common/common.pb.h"
-#include "kudu/common/table_util.h"
-#include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/sentry_client_metrics.h"
+#include "kudu/master/sentry_privileges_fetcher.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/sentry/sentry_action.h"
-#include "kudu/sentry/sentry_client.h"
-#include "kudu/sentry/sentry_policy_service_types.h"
-#include "kudu/thrift/client.h"
-#include "kudu/thrift/ha_client_metrics.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/slice.h"
+#include "kudu/sentry/sentry_authorizable_scope.h"
 
-using sentry::TListSentryPrivilegesRequest;
-using sentry::TListSentryPrivilegesResponse;
-using sentry::TSentryAuthorizable;
-using sentry::TSentryGrantOption;
-using sentry::TSentryPrivilege;
+DECLARE_string(sentry_service_rpc_addresses);
+
+using kudu::security::ColumnPrivilegePB;
+using kudu::security::TablePrivilegePB;
+using kudu::sentry::SentryAction;
+using kudu::sentry::SentryAuthorizableScope;
 using std::string;
-using std::unordered_map;
 using std::unordered_set;
-using std::vector;
-
-DEFINE_string(sentry_service_rpc_addresses, "",
-              "Comma-separated list of RPC addresses of the Sentry service(s). When "
-              "set, Sentry integration is enabled, fine-grained access control is "
-              "enforced in the master, and clients are issued authorization tokens. "
-              "Must match the value of the sentry.service.client.server.rpc-addresses "
-              "option in the Sentry server configuration.");
-TAG_FLAG(sentry_service_rpc_addresses, experimental);
-
-DEFINE_string(server_name, "server1",
-              "Configures which server namespace the Kudu instance belongs to for defining "
-              "server-level privileges in Sentry. Used to distinguish a particular Kudu "
-              "cluster in case of a multi-cluster setup. Must match the value of the "
-              "hive.sentry.server option in the HiveServer2 configuration, and the value "
-              "of the --server_name in Impala configuration.");
-TAG_FLAG(server_name, experimental);
-
-DEFINE_string(kudu_service_name, "kudu",
-              "The service name of the Kudu server. Must match the service name "
-              "used for Kudu server of sentry.service.admin.group option in the "
-              "Sentry server configuration.");
-TAG_FLAG(kudu_service_name, experimental);
-
-DEFINE_string(sentry_service_kerberos_principal, "sentry",
-              "The service principal of the Sentry server. Must match the primary "
-              "(user) portion of sentry.service.server.principal option in the "
-              "Sentry server configuration.");
-TAG_FLAG(sentry_service_kerberos_principal, experimental);
-
-DEFINE_string(sentry_service_security_mode, "kerberos",
-              "Configures whether Thrift connections to the Sentry server use "
-              "SASL (Kerberos) security. Must match the value of the "
-              "‘sentry.service.security.mode’ option in the Sentry server "
-              "configuration.");
-TAG_FLAG(sentry_service_security_mode, experimental);
-
-DEFINE_int32(sentry_service_retry_count, 1,
-             "The number of times that Sentry operations will retry after "
-             "encountering retriable failures, such as network errors.");
-TAG_FLAG(sentry_service_retry_count, advanced);
-TAG_FLAG(sentry_service_retry_count, experimental);
-
-DEFINE_int32(sentry_service_send_timeout_seconds, 60,
-             "Configures the socket send timeout, in seconds, for Thrift "
-             "connections to the Sentry server.");
-TAG_FLAG(sentry_service_send_timeout_seconds, advanced);
-TAG_FLAG(sentry_service_send_timeout_seconds, experimental);
-
-DEFINE_int32(sentry_service_recv_timeout_seconds, 60,
-             "Configures the socket receive timeout, in seconds, for Thrift "
-             "connections to the Sentry server.");
-TAG_FLAG(sentry_service_recv_timeout_seconds, advanced);
-TAG_FLAG(sentry_service_recv_timeout_seconds, experimental);
-
-DEFINE_int32(sentry_service_conn_timeout_seconds, 60,
-             "Configures the socket connect timeout, in seconds, for Thrift "
-             "connections to the Sentry server.");
-TAG_FLAG(sentry_service_conn_timeout_seconds, advanced);
-TAG_FLAG(sentry_service_conn_timeout_seconds, experimental);
-
-DEFINE_int32(sentry_service_max_message_size_bytes, 100 * 1024 * 1024,
-             "Maximum size of Sentry objects that can be received by the "
-             "Sentry client in bytes. Must match the value of the "
-             "sentry.policy.client.thrift.max.message.size option in the "
-             "Sentry server configuration.");
-TAG_FLAG(sentry_service_max_message_size_bytes, advanced);
-TAG_FLAG(sentry_service_max_message_size_bytes, experimental);
-
 using strings::Substitute;
 
 namespace kudu {
-
-using security::ColumnPrivilegePB;
-using security::TablePrivilegePB;
-using sentry::SentryAction;
-using sentry::SentryAuthorizableScope;
-using sentry::AuthorizableScopesSet;
-using sentry::SentryClient;
-
 namespace master {
 
-// Validates the sentry_service_rpc_addresses gflag.
-static bool ValidateAddresses(const char* flag_name, const string& addresses) {
-  vector<HostPort> host_ports;
-  Status s = HostPort::ParseStringsWithScheme(addresses,
-                                              SentryClient::kDefaultSentryPort,
-                                              &host_ports);
-  if (!s.ok()) {
-    LOG(ERROR) << "invalid flag " << flag_name << ": " << s.ToString();
-  }
-  return s.ok();
-}
-DEFINE_validator(sentry_service_rpc_addresses, &ValidateAddresses);
+namespace {
 
-bool SentryPrivilegesBranch::Implies(SentryAuthorizableScope::Scope required_scope,
-                                     SentryAction::Action required_action,
-                                     bool requires_all_with_grant) const {
+// Whether the given privileges 'privileges_branch' allows for the specified
+// action ('required_action') in the specified scope ('required_scope')
+// with GRANT ALL option, if any required ('requires_all_with_grant').
+bool IsActionAllowed(SentryAction::Action required_action,
+                     SentryAuthorizableScope::Scope required_scope,
+                     bool requires_all_with_grant,
+                     const SentryPrivilegesBranch& privileges_branch) {
   // In general, a privilege implies another when:
   // 1. the authorizable from the former implies the authorizable from the latter
   //    (authorizable with a higher scope on the hierarchy can imply authorizables
@@ -174,6 +75,7 @@ bool SentryPrivilegesBranch::Implies(SentryAuthorizableScope::Scope required_sco
   // to action and grant option. Otherwise, privilege escalation can happen.
   SentryAction action(required_action);
   SentryAuthorizableScope scope(required_scope);
+  const auto& privileges = privileges_branch.privileges();
   for (const auto& privilege : privileges) {
     // A grant option cannot imply the other if the latter is set but the
     // former is not.
@@ -182,8 +84,8 @@ bool SentryPrivilegesBranch::Implies(SentryAuthorizableScope::Scope required_sco
     }
     // Both privilege scope and action need to imply the other.
     if (SentryAuthorizableScope(privilege.scope).Implies(scope)) {
-      for (const auto& granted_action : privilege.granted_privileges) {
-        if (SentryAction(granted_action).Implies(action)) {
+      for (const auto& allowed_action : privilege.allowed_actions) {
+        if (SentryAction(allowed_action).Implies(action)) {
           return true;
         }
       }
@@ -192,14 +94,11 @@ bool SentryPrivilegesBranch::Implies(SentryAuthorizableScope::Scope required_sco
   return false;
 }
 
+} // anonymous namespace
+
 SentryAuthzProvider::SentryAuthzProvider(
     scoped_refptr<MetricEntity> metric_entity)
-    : metric_entity_(std::move(metric_entity)) {
-  if (metric_entity_) {
-    std::unique_ptr<SentryClientMetrics> metrics(
-        new SentryClientMetrics(metric_entity_));
-    ha_client_.SetMetrics(std::move(metrics));
-  }
+    : fetcher_(std::move(metric_entity)) {
 }
 
 SentryAuthzProvider::~SentryAuthzProvider() {
@@ -207,69 +106,17 @@ SentryAuthzProvider::~SentryAuthzProvider() {
 }
 
 Status SentryAuthzProvider::Start() {
-  vector<HostPort> addresses;
-  RETURN_NOT_OK(HostPort::ParseStringsWithScheme(FLAGS_sentry_service_rpc_addresses,
-                                                 SentryClient::kDefaultSentryPort,
-                                                 &addresses));
-
-  thrift::ClientOptions options;
-  options.enable_kerberos = boost::iequals(FLAGS_sentry_service_security_mode, "kerberos");
-  options.service_principal = FLAGS_sentry_service_kerberos_principal;
-  options.send_timeout = MonoDelta::FromSeconds(FLAGS_sentry_service_send_timeout_seconds);
-  options.recv_timeout = MonoDelta::FromSeconds(FLAGS_sentry_service_recv_timeout_seconds);
-  options.conn_timeout = MonoDelta::FromSeconds(FLAGS_sentry_service_conn_timeout_seconds);
-  options.max_buf_size = FLAGS_sentry_service_max_message_size_bytes;
-  options.retry_count = FLAGS_sentry_service_retry_count;
-  return ha_client_.Start(std::move(addresses), std::move(options));
+  return fetcher_.Start();
 }
 
 void SentryAuthzProvider::Stop() {
-  ha_client_.Stop();
+  fetcher_.Stop();
 }
 
 bool SentryAuthzProvider::IsEnabled() {
   return !FLAGS_sentry_service_rpc_addresses.empty();
 }
 
-namespace {
-
-// Returns an authorizable based on the table identifier (in the format
-// <database-name>.<table-name>) and the given scope.
-Status GetAuthorizable(const string& table_ident,
-                       SentryAuthorizableScope::Scope scope,
-                       TSentryAuthorizable* authorizable) {
-  Slice database;
-  Slice table;
-  // We should only ever request privileges from Sentry for authorizables of
-  // scope equal to or higher than 'TABLE'.
-  DCHECK_NE(scope, SentryAuthorizableScope::Scope::COLUMN);
-  switch (scope) {
-    case SentryAuthorizableScope::Scope::TABLE:
-      RETURN_NOT_OK(ParseHiveTableIdentifier(table_ident, &database, &table));
-      DCHECK(!table.empty());
-      authorizable->__set_table(table.ToString());
-      FALLTHROUGH_INTENDED;
-    case SentryAuthorizableScope::Scope::DATABASE:
-      if (database.empty() && table.empty()) {
-        RETURN_NOT_OK(ParseHiveTableIdentifier(table_ident, &database, &table));
-      }
-      DCHECK(!database.empty());
-      authorizable->__set_db(database.ToString());
-      FALLTHROUGH_INTENDED;
-    case SentryAuthorizableScope::Scope::SERVER:
-      authorizable->__set_server(FLAGS_server_name);
-      break;
-    default:
-      LOG(FATAL) << "unsupported SentryAuthorizableScope: "
-                 << sentry::ScopeToString(scope);
-      break;
-  }
-
-  return Status::OK();
-}
-
-} // anonymous namespace
-
 Status SentryAuthzProvider::AuthorizeCreateTable(const string& table_name,
                                                  const string& user,
                                                  const string& owner) {
@@ -330,223 +177,6 @@ Status SentryAuthzProvider::AuthorizeGetTableMetadata(const string& table_name,
                    table_name, user);
 }
 
-const AuthorizableScopesSet& SentryAuthzProvider::ExpectedEmptyFields(
-    SentryAuthorizableScope::Scope scope) {
-  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::DATABASE,
-                                                    SentryAuthorizableScope::TABLE,
-                                                    SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::TABLE,
-                                                SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kColumnFields{};
-  switch (scope) {
-    case SentryAuthorizableScope::SERVER:
-      return kServerFields;
-    case SentryAuthorizableScope::DATABASE:
-      return kDbFields;
-    case SentryAuthorizableScope::TABLE:
-      return kTableFields;
-    case SentryAuthorizableScope::COLUMN:
-      return kColumnFields;
-    default:
-      LOG(DFATAL) << "not reachable";
-  }
-  return kColumnFields;
-}
-
-const AuthorizableScopesSet& SentryAuthzProvider::ExpectedNonEmptyFields(
-    SentryAuthorizableScope::Scope scope) {
-  AuthorizableScopesSet expected_nonempty_fields;
-  static const AuthorizableScopesSet kColumnFields{ SentryAuthorizableScope::SERVER,
-                                                    SentryAuthorizableScope::DATABASE,
-                                                    SentryAuthorizableScope::TABLE,
-                                                    SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::SERVER,
-                                                   SentryAuthorizableScope::DATABASE,
-                                                   SentryAuthorizableScope::TABLE };
-  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::SERVER,
-                                                SentryAuthorizableScope::DATABASE };
-  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::SERVER };
-  switch (scope) {
-    case SentryAuthorizableScope::COLUMN:
-      return kColumnFields;
-    case SentryAuthorizableScope::TABLE:
-      return kTableFields;
-    case SentryAuthorizableScope::DATABASE:
-      return kDbFields;
-    case SentryAuthorizableScope::SERVER:
-      return kServerFields;
-    default:
-      LOG(DFATAL) << "not reachable";
-  }
-  return kColumnFields;
-}
-
-bool SentryAuthzProvider::SentryPrivilegeIsWellFormed(
-    const TSentryPrivilege& privilege,
-    const TSentryAuthorizable& requested_authorizable,
-    SentryAuthorizableScope::Scope* scope,
-    SentryAction::Action* action) {
-  DCHECK_EQ(FLAGS_server_name, requested_authorizable.server);
-  DCHECK(!requested_authorizable.server.empty());
-  DCHECK(requested_authorizable.column.empty());
-
-  // A requested table must be accompanied by a database.
-  bool authorizable_has_db = !requested_authorizable.db.empty();
-  bool authorizable_has_table = !requested_authorizable.table.empty();
-  DCHECK((authorizable_has_db && authorizable_has_table) || !authorizable_has_table);
-
-  // Ignore anything that isn't a Kudu-related privilege.
-  SentryAuthorizableScope granted_scope;
-  SentryAction granted_action;
-  Status s = SentryAuthorizableScope::FromString(privilege.privilegeScope, &granted_scope)
-      .AndThen([&] {
-        return SentryAction::FromString(privilege.action, &granted_action);
-      });
-  if (!s.ok()) {
-    return false;
-  }
-
-  // Make sure that there aren't extraneous fields set in the privilege.
-  for (const auto& empty_field : ExpectedEmptyFields(granted_scope.scope())) {
-    switch (empty_field) {
-      case SentryAuthorizableScope::COLUMN:
-        if (!privilege.columnName.empty()) {
-          return false;
-        }
-        break;
-      case SentryAuthorizableScope::TABLE:
-        if (!privilege.tableName.empty()) {
-          return false;
-        }
-        break;
-      case SentryAuthorizableScope::DATABASE:
-        if (!privilege.dbName.empty()) {
-          return false;
-        }
-        break;
-      case SentryAuthorizableScope::SERVER:
-        if (!privilege.serverName.empty()) {
-          return false;
-        }
-        break;
-      default:
-        LOG(DFATAL) << Substitute("Granted privilege has invalid scope: $0",
-                                  sentry::ScopeToString(granted_scope.scope()));
-    }
-  }
-  // Make sure that all expected fields are set, and that they match those in
-  // the requested authorizable.
-  for (const auto& nonempty_field : ExpectedNonEmptyFields(granted_scope.scope())) {
-    switch (nonempty_field) {
-      case SentryAuthorizableScope::COLUMN:
-        if (!privilege.__isset.columnName || privilege.columnName.empty()) {
-          return false;
-        }
-        break;
-      case SentryAuthorizableScope::TABLE:
-        if (!privilege.__isset.tableName || privilege.tableName.empty() ||
-            (authorizable_has_table &&
-             !boost::iequals(privilege.tableName, requested_authorizable.table))) {
-          return false;
-        }
-        break;
-      case SentryAuthorizableScope::DATABASE:
-        if (!privilege.__isset.dbName || privilege.dbName.empty() ||
-            (authorizable_has_db &&
-             !boost::iequals(privilege.dbName, requested_authorizable.db))) {
-          return false;
-        }
-        break;
-      case SentryAuthorizableScope::SERVER:
-        if (privilege.serverName.empty() ||
-            !boost::iequals(privilege.serverName, requested_authorizable.server)) {
-          return false;
-        }
-        break;
-      default:
-        LOG(DFATAL) << Substitute("Granted privilege has invalid scope: $0",
-                                  sentry::ScopeToString(granted_scope.scope()));
-    }
-  }
-  *scope = granted_scope.scope();
-  *action = granted_action.action();
-  return true;
-}
-
-namespace {
-
-// Returns a unique string key for the given authorizable, at the given scope.
-// The authorizable must be a well-formed at the given scope.
-string GetKey(const string& server, const string& db, const string& table, const string& column,
-              SentryAuthorizableScope::Scope scope) {
-  DCHECK(!server.empty());
-  switch (scope) {
-    case SentryAuthorizableScope::SERVER:
-      return server;
-    case SentryAuthorizableScope::DATABASE:
-      DCHECK(!db.empty());
-      return Substitute("$0/$1", server, db);
-    case SentryAuthorizableScope::TABLE:
-      DCHECK(!db.empty() && !table.empty());
-      return Substitute("$0/$1/$2", server, db, table);
-    case SentryAuthorizableScope::COLUMN:
-      DCHECK(!db.empty() && !table.empty() && !column.empty());
-      return Substitute("$0/$1/$2/$3", server, db, table, column);
-    default:
-      LOG(DFATAL) << "not reachable";
-  }
-  return "";
-}
-
-} // anonymous namespace
-
-Status SentryAuthzProvider::GetSentryPrivileges(SentryAuthorizableScope::Scope scope,
-                                                const string& table_name,
-                                                const string& user,
-                                                SentryPrivilegesBranch* privileges) {
-  TSentryAuthorizable requested_authorizable;
-  RETURN_NOT_OK(GetAuthorizable(table_name, scope, &requested_authorizable));
-
-  TListSentryPrivilegesRequest request;
-  request.__set_requestorUserName(FLAGS_kudu_service_name);
-  request.__set_principalName(user);
-  request.__set_authorizableHierarchy(requested_authorizable);
-  TListSentryPrivilegesResponse response;
-  RETURN_NOT_OK(ha_client_.Execute(
-      [&] (SentryClient* client) {
-        return client->ListPrivilegesByUser(request, &response);
-      }));
-  unordered_map<string, AuthorizablePrivileges> privileges_map;
-  for (const auto& privilege_resp : response.privileges) {
-    SentryAuthorizableScope::Scope granted_scope;
-    SentryAction::Action granted_action;
-    if (!SentryPrivilegeIsWellFormed(privilege_resp, requested_authorizable,
-                                     &granted_scope, &granted_action)) {
-      if (VLOG_IS_ON(1)) {
-        std::ostringstream os;
-        privilege_resp.printTo(os);
-        VLOG(1) << Substitute("Ignoring privilege response: $0", os.str());
-      }
-      continue;
-    }
-    const auto& db = privilege_resp.dbName;
-    const auto& table = privilege_resp.tableName;
-    const auto& column = privilege_resp.columnName;
-    const string authorizable_key = GetKey(privilege_resp.serverName, db, table, column,
-                                           granted_scope);
-    AuthorizablePrivileges& privilege = LookupOrInsert(&privileges_map, authorizable_key,
-        AuthorizablePrivileges(granted_scope, db, table, column));
-    InsertIfNotPresent(&privilege.granted_privileges, granted_action);
-    if ((granted_action == SentryAction::ALL || granted_action == SentryAction::OWNER) &&
-        privilege_resp.grantOption == TSentryGrantOption::ENABLED) {
-      privilege.all_with_grant = true;
-    }
-  }
-  EmplaceValuesFromMap(std::move(privileges_map), &privileges->privileges);
-  return Status::OK();
-}
-
 Status SentryAuthzProvider::FillTablePrivilegePB(const string& table_name,
                                                  const string& user,
                                                  const SchemaPB& schema_pb,
@@ -567,16 +197,16 @@ Status SentryAuthzProvider::FillTablePrivilegePB(const string& table_name,
   // than parsing them from Sentry privileges every time. This is tricky
   // because the column-level privileges depend on the input schema, which may
   // be different upon subsequent calls to this function.
-  SentryPrivilegesBranch privileges;
-  RETURN_NOT_OK(GetSentryPrivileges(SentryAuthorizableScope::TABLE, table_name,
-                                    user, &privileges));
+  SentryPrivilegesBranch privileges_branch;
+  RETURN_NOT_OK(fetcher_.GetSentryPrivileges(
+      SentryAuthorizableScope::TABLE, table_name, user, &privileges_branch));
   unordered_set<string> scannable_col_names;
   static const SentryAuthorizableScope kTableScope(SentryAuthorizableScope::TABLE);
-  for (const auto& privilege : privileges.privileges) {
+  for (const auto& privilege : privileges_branch.privileges()) {
     if (SentryAuthorizableScope(privilege.scope).Implies(kTableScope)) {
       // Pull out any privileges at the table scope or higher.
-      if (ContainsKey(privilege.granted_privileges, SentryAction::ALL) ||
-          ContainsKey(privilege.granted_privileges, SentryAction::OWNER)) {
+      if (ContainsKey(privilege.allowed_actions, SentryAction::ALL) ||
+          ContainsKey(privilege.allowed_actions, SentryAction::OWNER)) {
         // Generate privilege with everything.
         pb->set_delete_privilege(true);
         pb->set_insert_privilege(true);
@@ -584,22 +214,22 @@ Status SentryAuthzProvider::FillTablePrivilegePB(const string& table_name,
         pb->set_update_privilege(true);
         return Status::OK();
       }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::DELETE)) {
+      if (ContainsKey(privilege.allowed_actions, SentryAction::DELETE)) {
         pb->set_delete_privilege(true);
       }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::INSERT)) {
+      if (ContainsKey(privilege.allowed_actions, SentryAction::INSERT)) {
         pb->set_insert_privilege(true);
       }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::SELECT)) {
+      if (ContainsKey(privilege.allowed_actions, SentryAction::SELECT)) {
         pb->set_scan_privilege(true);
       }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::UPDATE)) {
+      if (ContainsKey(privilege.allowed_actions, SentryAction::UPDATE)) {
         pb->set_update_privilege(true);
       }
     } else if (!pb->scan_privilege() &&
-               (ContainsKey(privilege.granted_privileges, SentryAction::ALL) ||
-                ContainsKey(privilege.granted_privileges, SentryAction::OWNER) ||
-                ContainsKey(privilege.granted_privileges, SentryAction::SELECT))) {
+               (ContainsKey(privilege.allowed_actions, SentryAction::ALL) ||
+                ContainsKey(privilege.allowed_actions, SentryAction::OWNER) ||
+                ContainsKey(privilege.allowed_actions, SentryAction::SELECT))) {
       // Pull out any scan privileges at the column scope.
       DCHECK_EQ(SentryAuthorizableScope::COLUMN, privilege.scope);
       DCHECK(!privilege.column_name.empty());
@@ -628,8 +258,8 @@ Status SentryAuthzProvider::Authorize(SentryAuthorizableScope::Scope scope,
   }
 
   SentryPrivilegesBranch privileges;
-  RETURN_NOT_OK(GetSentryPrivileges(scope, table_ident, user, &privileges));
-  if (privileges.Implies(scope, action, require_grant_option)) {
+  RETURN_NOT_OK(fetcher_.GetSentryPrivileges(scope, table_ident, user, &privileges));
+  if (IsActionAllowed(action, scope, require_grant_option, privileges)) {
     return Status::OK();
   }
 
diff --git a/src/kudu/master/sentry_authz_provider.h b/src/kudu/master/sentry_authz_provider.h
index 1a1efa2..9eff699 100644
--- a/src/kudu/master/sentry_authz_provider.h
+++ b/src/kudu/master/sentry_authz_provider.h
@@ -17,30 +17,19 @@
 
 #pragma once
 
-#include <ostream>
 #include <string>
-#include <utility>
-#include <vector>
 
-#include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
-#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/master/authz_provider.h"
+#include "kudu/master/sentry_privileges_fetcher.h"
 #include "kudu/sentry/sentry_action.h"
 #include "kudu/sentry/sentry_authorizable_scope.h"
-#include "kudu/sentry/sentry_client.h"
-#include "kudu/thrift/client.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 
-namespace sentry {
-class TSentryAuthorizable;
-class TSentryPrivilege;
-} // namespace sentry
-
 namespace kudu {
 
 class SchemaPB;
@@ -51,69 +40,6 @@ class TablePrivilegePB;
 
 namespace master {
 
-// Utility struct to facilitate evaluating the privileges of a given
-// authorizable. This is preferred to using Sentry's Thrift responses directly,
-// since useful information has already been parsed to generate this struct
-// (e.g. the SentryActions and scope).
-struct AuthorizablePrivileges {
-  AuthorizablePrivileges(sentry::SentryAuthorizableScope::Scope scope,
-                         std::string db,
-                         std::string table,
-                         std::string column)
-    : scope(scope),
-      db_name(std::move(db)),
-      table_name(std::move(table)),
-      column_name(std::move(column)) {
-#ifndef NDEBUG
-    switch (scope) {
-      case sentry::SentryAuthorizableScope::COLUMN:
-        CHECK(!column_name.empty());
-        FALLTHROUGH_INTENDED;
-      case sentry::SentryAuthorizableScope::TABLE:
-        CHECK(!table_name.empty());
-        FALLTHROUGH_INTENDED;
-      case sentry::SentryAuthorizableScope::DATABASE:
-        CHECK(!db_name.empty());
-        break;
-      case sentry::SentryAuthorizableScope::SERVER:
-        break;
-      default:
-        LOG(FATAL) << "not reachable";
-    }
-#endif
-  }
-
-  // Whether the privilege 'ALL' or 'OWNER' has been granted with Sentry's
-  // grant option enabled. Note that the grant option can be granted on any
-  // action, but for Kudu, we only use it with 'ALL' or 'OWNER'.
-  bool all_with_grant = false;
-
-  // The scope of the authorizable being granted privileges.
-  const sentry::SentryAuthorizableScope::Scope scope;
-
-  // The set of actions on which privileges are granted.
-  sentry::SentryActionsSet granted_privileges;
-
-  // The fields of the authorizable.
-  std::string db_name;
-  std::string table_name;
-  std::string column_name;
-};
-
-// A representation of the Sentry privilege hierarchy branch for a single table
-// (including privileges for the table's ancestors and descendents) for a
-// single user.
-struct SentryPrivilegesBranch {
-  // Set of privileges are granted.
-  std::vector<AuthorizablePrivileges> privileges;
-
-  // Returns whether or not this implies the given action and scope for the
-  // given table.
-  bool Implies(sentry::SentryAuthorizableScope::Scope required_scope,
-               sentry::SentryAction::Action required_action,
-               bool requires_all_with_grant) const;
-};
-
 // An implementation of AuthzProvider that connects to the Sentry service
 // for authorization metadata and allows or denies the actions performed by
 // users based on the metadata.
@@ -135,7 +61,7 @@ class SentryAuthzProvider : public AuthzProvider {
 
   // The following authorizing methods will fail if:
   //   - the operation is not authorized
-  //   - the Sentry service is unreachable
+  //   - the Sentry service is unreachable (when privilege caching is disabled)
   //   - Sentry fails to resolve the group mapping of the user
   //   - the specified '--kudu_service_name' is a non-admin user in Sentry
   // TODO(hao): add early failure recognition when SENTRY-2440 is done.
@@ -162,45 +88,11 @@ class SentryAuthzProvider : public AuthzProvider {
                               security::TablePrivilegePB* pb) override WARN_UNUSED_RESULT;
 
  private:
-  friend class SentryAuthzProviderFilterPrivilegesTest;
-  FRIEND_TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed);
+  friend class SentryAuthzProviderTest;
   FRIEND_TEST(TestAuthzHierarchy, TestAuthorizableScope);
   FRIEND_TEST(SentryAuthzProviderFilterPrivilegesScopeTest, TestFilterInvalidResponses);
   FRIEND_TEST(SentryAuthzProviderFilterPrivilegesScopeTest, TestFilterValidResponses);
 
-  // Utility function to determine whether the given privilege is a well-formed
-  // possibly Kudu-related privilege describing a descendent or ancestor of the
-  // requested authorizable in the Sentry hierarchy tree, i.e. it:
-  // - has a Kudu-related action (e.g. ALL, INSERT, UPDATE, etc.),
-  // - has a Kudu-related authorizable scope (e.g. SERVER, DATABASE, etc.),
-  // - all fields of equal or higher scope to the privilege's scope are set;
-  //   none lower are set, and
-  // - all fields that are set match those set by the input authorizable.
-  static bool SentryPrivilegeIsWellFormed(
-      const ::sentry::TSentryPrivilege& privilege,
-      const ::sentry::TSentryAuthorizable& requested_authorizable,
-      sentry::SentryAuthorizableScope::Scope* scope,
-      sentry::SentryAction::Action* action);
-
-  // Returns the set of scope fields expected to be non-empty in a Sentry
-  // response with the given authorizable scope. All fields of equal or higher
-  // scope are expected to be set.
-  static const sentry::AuthorizableScopesSet& ExpectedNonEmptyFields(
-      sentry::SentryAuthorizableScope::Scope scope);
-
-  // Returns the set of scope fields expected to be empty in a Sentry response
-  // with the given authorizable scope. All fields of lower scope are expected
-  // to be empty.
-  static const sentry::AuthorizableScopesSet& ExpectedEmptyFields(
-      sentry::SentryAuthorizableScope::Scope scope);
-
-  // Fetches the user's privileges from Sentry for the authorizable specified
-  // by the given table and scope.
-  Status GetSentryPrivileges(sentry::SentryAuthorizableScope::Scope scope,
-                             const std::string& table_name,
-                             const std::string& user,
-                             SentryPrivilegesBranch* privileges);
-
   // Checks if the user can perform an action on the table identifier (in the format
   // <database-name>.<table-name>), based on the given authorizable scope and the
   // grant option. Note that the authorizable scope should be equal or higher than
@@ -215,8 +107,10 @@ class SentryAuthzProvider : public AuthzProvider {
                    const std::string& user,
                    bool require_grant_option = false);
 
-  scoped_refptr<MetricEntity> metric_entity_;
-  thrift::HaClient<sentry::SentryClient> ha_client_;
+  // An instance of utility class that provides interface to search for
+  // required privileges through the information received from Sentry.
+  // The fetcher can optionally cache the received information.
+  SentryPrivilegesFetcher fetcher_;
 };
 
 } // namespace master
diff --git a/src/kudu/master/sentry_privileges_cache_metrics.cc b/src/kudu/master/sentry_privileges_cache_metrics.cc
new file mode 100644
index 0000000..65549b0
--- /dev/null
+++ b/src/kudu/master/sentry_privileges_cache_metrics.cc
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/master/sentry_privileges_cache_metrics.h"
+
+#include "kudu/util/metrics.h"
+
+METRIC_DEFINE_counter(server, sentry_privileges_cache_inserts,
+                      "Sentry Privileges Cache Inserts",
+                      kudu::MetricUnit::kEntries,
+                      "Number of entries inserted in the cache");
+METRIC_DEFINE_counter(server, sentry_privileges_cache_lookups,
+                      "Sentry Privileges Cache Lookups",
+                      kudu::MetricUnit::kEntries,
+                      "Number of entries looked up from the cache");
+METRIC_DEFINE_counter(server, sentry_privileges_cache_evictions,
+                      "Sentry Privileges Cache Evictions",
+                      kudu::MetricUnit::kEntries,
+                      "Number of entries evicted from the cache");
+METRIC_DEFINE_counter(server, sentry_privileges_cache_evictions_expired,
+                      "Sentry Privileges Cache Evictions of Expired Entries",
+                      kudu::MetricUnit::kEntries,
+                      "Number of entries that had already expired upon "
+                      "eviction from the cache");
+METRIC_DEFINE_counter(server, sentry_privileges_cache_misses,
+                      "Sentry Privileges Cache Misses",
+                      kudu::MetricUnit::kEntries,
+                      "Number of lookups that didn't find a cached entry");
+METRIC_DEFINE_counter(server, sentry_privileges_cache_hits,
+                      "Sentry Privileges Cache Hits",
+                      kudu::MetricUnit::kEntries,
+                      "Number of lookups that found a cached entry");
+METRIC_DEFINE_counter(server, sentry_privileges_cache_hits_expired,
+                      "Sentry Privileges Cache Hits of Expired Entries",
+                      kudu::MetricUnit::kEntries,
+                      "Number of lookups that found an entry, but the entry "
+                      "had already expired at the time of lookup");
+METRIC_DEFINE_gauge_uint64(server, sentry_privileges_cache_memory_usage,
+                           "Sentry Privileges Cache Memory Usage",
+                           kudu::MetricUnit::kBytes,
+                           "Memory consumed by the cache");
+
+namespace kudu {
+namespace master {
+
+#define MINIT(member, x) member = METRIC_##x.Instantiate(metric_entity)
+#define GINIT(member, x) member = METRIC_##x.Instantiate(metric_entity, 0)
+SentryPrivilegesCacheMetrics::SentryPrivilegesCacheMetrics(
+    const scoped_refptr<MetricEntity>& metric_entity) {
+  MINIT(inserts, sentry_privileges_cache_inserts);
+  MINIT(lookups, sentry_privileges_cache_lookups);
+  MINIT(evictions, sentry_privileges_cache_evictions);
+  MINIT(evictions_expired, sentry_privileges_cache_evictions_expired);
+  MINIT(cache_hits_caching, sentry_privileges_cache_hits);
+  MINIT(cache_hits_expired, sentry_privileges_cache_hits_expired);
+  MINIT(cache_misses_caching, sentry_privileges_cache_misses);
+  GINIT(cache_usage, sentry_privileges_cache_memory_usage);
+}
+#undef MINIT
+#undef GINIT
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/sentry_privileges_cache_metrics.h b/src/kudu/master/sentry_privileges_cache_metrics.h
new file mode 100644
index 0000000..3f21ca9
--- /dev/null
+++ b/src/kudu/master/sentry_privileges_cache_metrics.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/ttl_cache_metrics.h"
+
+namespace kudu {
+
+class MetricEntity;
+
+namespace master {
+
+struct SentryPrivilegesCacheMetrics : public TTLCacheMetrics {
+  explicit SentryPrivilegesCacheMetrics(
+      const scoped_refptr<MetricEntity>& metric_entity);
+};
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/sentry_authz_provider.cc b/src/kudu/master/sentry_privileges_fetcher.cc
similarity index 53%
copy from src/kudu/master/sentry_authz_provider.cc
copy to src/kudu/master/sentry_privileges_fetcher.cc
index dfb6895..0e3aad1 100644
--- a/src/kudu/master/sentry_authz_provider.cc
+++ b/src/kudu/master/sentry_privileges_fetcher.cc
@@ -15,46 +15,40 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/master/sentry_authz_provider.h"
+#include "kudu/master/sentry_privileges_fetcher.h"
 
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <ostream>
 #include <type_traits>
 #include <unordered_map>
-#include <unordered_set>
-#include <utility>
 #include <vector>
 
 #include <boost/algorithm/string/predicate.hpp>
 #include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
-#include "kudu/common/common.pb.h"
 #include "kudu/common/table_util.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/sentry_client_metrics.h"
-#include "kudu/security/token.pb.h"
+#include "kudu/master/sentry_privileges_cache_metrics.h"
 #include "kudu/sentry/sentry_action.h"
+#include "kudu/sentry/sentry_authorizable_scope.h"
 #include "kudu/sentry/sentry_client.h"
 #include "kudu/sentry/sentry_policy_service_types.h"
 #include "kudu/thrift/client.h"
 #include "kudu/thrift/ha_client_metrics.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/malloc.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/slice.h"
-
-using sentry::TListSentryPrivilegesRequest;
-using sentry::TListSentryPrivilegesResponse;
-using sentry::TSentryAuthorizable;
-using sentry::TSentryGrantOption;
-using sentry::TSentryPrivilege;
-using std::string;
-using std::unordered_map;
-using std::unordered_set;
-using std::vector;
+#include "kudu/util/ttl_cache_metrics.h"
 
 DEFINE_string(sentry_service_rpc_addresses, "",
               "Comma-separated list of RPC addresses of the Sentry service(s). When "
@@ -123,17 +117,43 @@ DEFINE_int32(sentry_service_max_message_size_bytes, 100 * 1024 * 1024,
 TAG_FLAG(sentry_service_max_message_size_bytes, advanced);
 TAG_FLAG(sentry_service_max_message_size_bytes, experimental);
 
+// TODO(aserbin): provide some reasonable default value for the
+//                --sentry_privileges_cache_capacity_mb flag. Maybe, make it
+//                a multiple of FLAG_sentry_service_max_message_size_bytes ?
+DEFINE_uint32(sentry_privileges_cache_capacity_mb, 256,
+              "Capacity for the authz cache, in MiBytes. The cache stores "
+              "information received from Sentry. A value of 0 means Sentry "
+              "responses will not be cached.");
+TAG_FLAG(sentry_privileges_cache_capacity_mb, advanced);
+
+DEFINE_uint32(sentry_privileges_cache_ttl_factor, 10,
+              "Factor of multiplication for the authz token validity interval "
+              "defined by --authz_token_validity_seconds flag. The result of "
+              "the multiplication of this factor and authz token validity "
+              "defines the TTL of entries in the authz cache.");
+TAG_FLAG(sentry_privileges_cache_ttl_factor, advanced);
+TAG_FLAG(sentry_privileges_cache_ttl_factor, experimental);
+
+DECLARE_int64(authz_token_validity_seconds);
+DECLARE_string(kudu_service_name);
+DECLARE_string(server_name);
+
+using kudu::sentry::AuthorizableScopesSet;
+using kudu::sentry::SentryAction;
+using kudu::sentry::SentryAuthorizableScope;
+using kudu::sentry::SentryClient;
+using sentry::TListSentryPrivilegesRequest;
+using sentry::TListSentryPrivilegesResponse;
+using sentry::TSentryAuthorizable;
+using sentry::TSentryGrantOption;
+using sentry::TSentryPrivilege;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
-
-using security::ColumnPrivilegePB;
-using security::TablePrivilegePB;
-using sentry::SentryAction;
-using sentry::SentryAuthorizableScope;
-using sentry::AuthorizableScopesSet;
-using sentry::SentryClient;
-
 namespace master {
 
 // Validates the sentry_service_rpc_addresses gflag.
@@ -149,88 +169,6 @@ static bool ValidateAddresses(const char* flag_name, const string& addresses) {
 }
 DEFINE_validator(sentry_service_rpc_addresses, &ValidateAddresses);
 
-bool SentryPrivilegesBranch::Implies(SentryAuthorizableScope::Scope required_scope,
-                                     SentryAction::Action required_action,
-                                     bool requires_all_with_grant) const {
-  // In general, a privilege implies another when:
-  // 1. the authorizable from the former implies the authorizable from the latter
-  //    (authorizable with a higher scope on the hierarchy can imply authorizables
-  //    with a lower scope on the hierarchy, but not vice versa), and
-  // 2. the action from the former implies the action from the latter, and
-  // 3. grant option from the former implies the grant option from the latter.
-  //
-  // See org.apache.sentry.policy.common.CommonPrivilege. Note that policy validation
-  // in CommonPrivilege also allows wildcard authorizable matching. For example,
-  // authorizable 'server=server1->db=*' can imply authorizable 'server=server1'.
-  // However, wildcard authorizable granting is neither practical nor useful (semantics
-  // of granting such privilege are not supported in Apache Hive, Impala and Hue. And
-  // 'server=server1->db=*' has exactly the same meaning as 'server=server1'). Therefore,
-  // wildcard authorizable matching is dropped in this implementation.
-  //
-  // Moreover, because ListPrivilegesByUser lists all Sentry privileges granted to the
-  // user that match the authorizable of each scope in the input authorizable hierarchy,
-  // privileges with lower scope will also be returned in the response. This contradicts
-  // rule (1) mentioned above. Therefore, we need to validate privilege scope, in addition
-  // to action and grant option. Otherwise, privilege escalation can happen.
-  SentryAction action(required_action);
-  SentryAuthorizableScope scope(required_scope);
-  for (const auto& privilege : privileges) {
-    // A grant option cannot imply the other if the latter is set but the
-    // former is not.
-    if (requires_all_with_grant && !privilege.all_with_grant) {
-      continue;
-    }
-    // Both privilege scope and action need to imply the other.
-    if (SentryAuthorizableScope(privilege.scope).Implies(scope)) {
-      for (const auto& granted_action : privilege.granted_privileges) {
-        if (SentryAction(granted_action).Implies(action)) {
-          return true;
-        }
-      }
-    }
-  }
-  return false;
-}
-
-SentryAuthzProvider::SentryAuthzProvider(
-    scoped_refptr<MetricEntity> metric_entity)
-    : metric_entity_(std::move(metric_entity)) {
-  if (metric_entity_) {
-    std::unique_ptr<SentryClientMetrics> metrics(
-        new SentryClientMetrics(metric_entity_));
-    ha_client_.SetMetrics(std::move(metrics));
-  }
-}
-
-SentryAuthzProvider::~SentryAuthzProvider() {
-  Stop();
-}
-
-Status SentryAuthzProvider::Start() {
-  vector<HostPort> addresses;
-  RETURN_NOT_OK(HostPort::ParseStringsWithScheme(FLAGS_sentry_service_rpc_addresses,
-                                                 SentryClient::kDefaultSentryPort,
-                                                 &addresses));
-
-  thrift::ClientOptions options;
-  options.enable_kerberos = boost::iequals(FLAGS_sentry_service_security_mode, "kerberos");
-  options.service_principal = FLAGS_sentry_service_kerberos_principal;
-  options.send_timeout = MonoDelta::FromSeconds(FLAGS_sentry_service_send_timeout_seconds);
-  options.recv_timeout = MonoDelta::FromSeconds(FLAGS_sentry_service_recv_timeout_seconds);
-  options.conn_timeout = MonoDelta::FromSeconds(FLAGS_sentry_service_conn_timeout_seconds);
-  options.max_buf_size = FLAGS_sentry_service_max_message_size_bytes;
-  options.retry_count = FLAGS_sentry_service_retry_count;
-  return ha_client_.Start(std::move(addresses), std::move(options));
-}
-
-void SentryAuthzProvider::Stop() {
-  ha_client_.Stop();
-}
-
-bool SentryAuthzProvider::IsEnabled() {
-  return !FLAGS_sentry_service_rpc_addresses.empty();
-}
-
 namespace {
 
 // Returns an authorizable based on the table identifier (in the format
@@ -268,121 +206,257 @@ Status GetAuthorizable(const string& table_ident,
   return Status::OK();
 }
 
-} // anonymous namespace
-
-Status SentryAuthzProvider::AuthorizeCreateTable(const string& table_name,
-                                                 const string& user,
-                                                 const string& owner) {
-  // If the table is being created with a different owner than the user,
-  // then the creating user must have 'ALL ON DATABASE' with grant. See
-  // design doc in [SENTRY-2151](https://issues.apache.org/jira/browse/SENTRY-2151).
-  //
-  // Otherwise, table creation requires 'CREATE ON DATABASE' privilege.
-  SentryAction::Action action;
-  bool grant_option;
-  if (user == owner) {
-    action = SentryAction::Action::CREATE;
-    grant_option = false;
-  } else {
-    action = SentryAction::Action::ALL;
-    grant_option = true;
+// A utility class to help with Sentry privilege scoping, generating
+// sequence of keys to lookup corresponding entries in the cache.
+// For example, for user 'U' and authorizable { server:S, db:D, table:T }
+// (in pseudo-form) the sequence of keys is { 'U/S', 'U/S/D', 'U/S/D/T' }.
+class AuthzInfoKey {
+ public:
+  AuthzInfoKey(const string& user,
+               const ::sentry::TSentryAuthorizable& authorizable);
+
+  const string& GetFlattenedKey() {
+    // The flattened key is the very last element of the key_sequence_.
+    return key_sequence_.back();
   }
-  return Authorize(SentryAuthorizableScope::Scope::DATABASE, action,
-                   table_name, user, grant_option);
-}
 
-Status SentryAuthzProvider::AuthorizeDropTable(const string& table_name,
-                                               const string& user) {
-  // Table deletion requires 'DROP ON TABLE' privilege.
-  return Authorize(SentryAuthorizableScope::Scope::TABLE,
-                   SentryAction::Action::DROP,
-                   table_name, user);
+ private:
+  // Generate the key lookup sequence: a sequence of keys to use while
+  // looking up corresponding entry in the authz cache based on the
+  // hierarchy of scoping for Sentry authorizables.
+  static vector<string> GenerateKeySequence(
+      const string& user, const ::sentry::TSentryAuthorizable& authorizable);
+
+  const vector<string> key_sequence_;
+};
+
+AuthzInfoKey::AuthzInfoKey(const string& user,
+                           const ::sentry::TSentryAuthorizable& authorizable)
+    : key_sequence_(GenerateKeySequence(user, authorizable)) {
+  CHECK(!key_sequence_.empty());
 }
 
-Status SentryAuthzProvider::AuthorizeAlterTable(const string& old_table,
-                                                const string& new_table,
-                                                const string& user) {
-  // For table alteration (without table rename) requires 'ALTER ON TABLE'
-  // privilege;
-  // For table alteration (with table rename) requires
-  //  1. 'ALL ON TABLE <old-table>',
-  //  2. 'CREATE ON DATABASE <new-database>'.
-  // See [SENTRY-2264](https://issues.apache.org/jira/browse/SENTRY-2264).
-  // TODO(hao): add inline hierarchy validation to avoid multiple RPCs.
-  if (old_table == new_table) {
-    return Authorize(SentryAuthorizableScope::Scope::TABLE,
-                     SentryAction::Action::ALTER,
-                     old_table, user);
+// TODO(aserbin): consider other ways of encoding a key for an object
+vector<string> AuthzInfoKey::GenerateKeySequence(
+    const string& user, const ::sentry::TSentryAuthorizable& authorizable) {
+  DCHECK(!user.empty());
+  DCHECK(!authorizable.server.empty());
+  if (!authorizable.__isset.db || authorizable.db.empty()) {
+    return {
+      Substitute("/$0/$1", user, authorizable.server),
+    };
+  }
+
+  if (!authorizable.__isset.table || authorizable.table.empty()) {
+    auto k0 = Substitute("/$0/$1", user, authorizable.server);
+    auto k1 = Substitute("/$0/$1", k0, authorizable.db);
+    return { std::move(k0), std::move(k1), };
   }
-  RETURN_NOT_OK(Authorize(SentryAuthorizableScope::Scope::TABLE,
-                          SentryAction::Action::ALL,
-                          old_table, user));
-  return Authorize(SentryAuthorizableScope::Scope::DATABASE,
-                   SentryAction::Action::CREATE,
-                   new_table, user);
-}
 
-Status SentryAuthzProvider::AuthorizeGetTableMetadata(const string& table_name,
-                                                      const string& user) {
-  // Retrieving table metadata requires 'METADATA ON TABLE' privilege.
-  return Authorize(SentryAuthorizableScope::Scope::TABLE,
-                   SentryAction::Action::METADATA,
-                   table_name, user);
+  if (!authorizable.__isset.column || authorizable.column.empty()) {
+    auto k0 = Substitute("/$0/$1", user, authorizable.server);
+    auto k1 = Substitute("/$0/$1", k0, authorizable.db);
+    auto k2 = Substitute("/$0/$1", k1, authorizable.table);
+    return { std::move(k0), std::move(k1), std::move(k2), };
+  }
+
+  auto k0 = Substitute("/$0/$1", user, authorizable.server);
+  auto k1 = Substitute("/$0/$1", k0, authorizable.db);
+  auto k2 = Substitute("/$0/$1", k1, authorizable.table);
+  auto k3 = Substitute("/$0/$1", k2, authorizable.column);
+  return { std::move(k0), std::move(k1), std::move(k2), std::move(k3), };
 }
 
-const AuthorizableScopesSet& SentryAuthzProvider::ExpectedEmptyFields(
-    SentryAuthorizableScope::Scope scope) {
-  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::DATABASE,
-                                                    SentryAuthorizableScope::TABLE,
-                                                    SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::TABLE,
-                                                SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kColumnFields{};
+// Returns a unique string key for the given authorizable, at the given scope.
+// The authorizable must be a well-formed at the given scope.
+string GetKey(const string& server,
+              const string& db,
+              const string& table,
+              const string& column,
+              SentryAuthorizableScope::Scope scope) {
+  DCHECK(!server.empty());
   switch (scope) {
     case SentryAuthorizableScope::SERVER:
-      return kServerFields;
+      return server;
     case SentryAuthorizableScope::DATABASE:
-      return kDbFields;
+      DCHECK(!db.empty());
+      return Substitute("$0/$1", server, db);
     case SentryAuthorizableScope::TABLE:
-      return kTableFields;
+      DCHECK(!db.empty());
+      DCHECK(!table.empty());
+      return Substitute("$0/$1/$2", server, db, table);
     case SentryAuthorizableScope::COLUMN:
-      return kColumnFields;
+      DCHECK(!db.empty());
+      DCHECK(!table.empty());
+      DCHECK(!column.empty());
+      return Substitute("$0/$1/$2/$3", server, db, table, column);
     default:
       LOG(DFATAL) << "not reachable";
+      break;
   }
-  return kColumnFields;
+  return "";
 }
 
-const AuthorizableScopesSet& SentryAuthzProvider::ExpectedNonEmptyFields(
-    SentryAuthorizableScope::Scope scope) {
-  AuthorizableScopesSet expected_nonempty_fields;
-  static const AuthorizableScopesSet kColumnFields{ SentryAuthorizableScope::SERVER,
-                                                    SentryAuthorizableScope::DATABASE,
-                                                    SentryAuthorizableScope::TABLE,
-                                                    SentryAuthorizableScope::COLUMN };
-  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::SERVER,
-                                                   SentryAuthorizableScope::DATABASE,
-                                                   SentryAuthorizableScope::TABLE };
-  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::SERVER,
-                                                SentryAuthorizableScope::DATABASE };
-  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::SERVER };
-  switch (scope) {
-    case SentryAuthorizableScope::COLUMN:
-      return kColumnFields;
-    case SentryAuthorizableScope::TABLE:
-      return kTableFields;
-    case SentryAuthorizableScope::DATABASE:
-      return kDbFields;
-    case SentryAuthorizableScope::SERVER:
-      return kServerFields;
-    default:
-      LOG(DFATAL) << "not reachable";
+} // anonymous namespace
+
+
+SentryPrivilegesBranch::SentryPrivilegesBranch(
+    const ::sentry::TSentryAuthorizable& authorizable,
+    const TListSentryPrivilegesResponse& response) {
+  DoInit(authorizable, response);
+}
+
+size_t SentryPrivilegesBranch::memory_footprint() const {
+  size_t res = kudu_malloc_usable_size(this);
+  // This is a simple approximation: the exact information could be available
+  // from the allocator of std::vector and std::string.
+  res += privileges_.capacity() * sizeof(AuthorizablePrivileges);
+  for (const auto& p : privileges_) {
+    res += p.db_name.capacity();
+    res += p.table_name.capacity();
+    res += p.column_name.capacity();
+    res += sizeof(decltype(p.allowed_actions));
   }
-  return kColumnFields;
+  return res;
 }
 
-bool SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+void SentryPrivilegesBranch::DoInit(
+    const ::sentry::TSentryAuthorizable& authorizable,
+    const TListSentryPrivilegesResponse& response) {
+  unordered_map<string, AuthorizablePrivileges> privileges_map;
+  for (const auto& privilege_resp : response.privileges) {
+    SentryAuthorizableScope::Scope scope;
+    SentryAction::Action action;
+    if (!SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
+        privilege_resp, authorizable, &scope, &action)) {
+      if (VLOG_IS_ON(1)) {
+        std::ostringstream os;
+        privilege_resp.printTo(os);
+        VLOG(1) << Substitute("Ignoring privilege response: $0", os.str());
+      }
+      continue;
+    }
+    const auto& db = privilege_resp.dbName;
+    const auto& table = privilege_resp.tableName;
+    const auto& column = privilege_resp.columnName;
+    const string authorizable_key = GetKey(privilege_resp.serverName,
+                                           db, table, column, scope);
+    auto& privilege = LookupOrInsert(&privileges_map, authorizable_key,
+        AuthorizablePrivileges(scope, db, table, column));
+    InsertIfNotPresent(&privilege.allowed_actions, action);
+    if (action == SentryAction::ALL || action == SentryAction::OWNER) {
+      privilege.all_with_grant =
+          (privilege_resp.grantOption == TSentryGrantOption::ENABLED);
+    }
+    if (VLOG_IS_ON(1)) {
+      std::ostringstream os;
+      privilege_resp.printTo(os);
+      if (action != SentryAction::ALL && action != SentryAction::OWNER &&
+          privilege_resp.grantOption == TSentryGrantOption::ENABLED) {
+        VLOG(1) << "ignoring ENABLED grant option for unexpected action: "
+                << static_cast<int16_t>(action);
+      }
+    }
+  }
+  EmplaceValuesFromMap(std::move(privileges_map), &privileges_);
+}
+
+SentryPrivilegesFetcher::SentryPrivilegesFetcher(
+    scoped_refptr<MetricEntity> metric_entity)
+    : metric_entity_(std::move(metric_entity)) {
+  if (metric_entity_) {
+    std::unique_ptr<SentryClientMetrics> metrics(
+        new SentryClientMetrics(metric_entity_));
+    sentry_client_.SetMetrics(std::move(metrics));
+  }
+}
+
+Status SentryPrivilegesFetcher::Start() {
+  ResetCache();
+
+  vector<HostPort> addresses;
+  RETURN_NOT_OK(HostPort::ParseStringsWithScheme(
+      FLAGS_sentry_service_rpc_addresses,
+      SentryClient::kDefaultSentryPort,
+      &addresses));
+
+  thrift::ClientOptions options;
+  options.enable_kerberos = boost::iequals(
+      FLAGS_sentry_service_security_mode, "kerberos");
+  options.service_principal =
+      FLAGS_sentry_service_kerberos_principal;
+  options.send_timeout = MonoDelta::FromSeconds(
+      FLAGS_sentry_service_send_timeout_seconds);
+  options.recv_timeout = MonoDelta::FromSeconds(
+      FLAGS_sentry_service_recv_timeout_seconds);
+  options.conn_timeout = MonoDelta::FromSeconds(
+      FLAGS_sentry_service_conn_timeout_seconds);
+  options.max_buf_size =
+      FLAGS_sentry_service_max_message_size_bytes;
+  options.retry_count =
+      FLAGS_sentry_service_retry_count;
+
+  return sentry_client_.Start(std::move(addresses), std::move(options));
+}
+
+void SentryPrivilegesFetcher::Stop() {
+  sentry_client_.Stop();
+}
+
+// TODO(aserbin): change the signature to return a handle that keeps reference
+//                to either cache entry or SentryPrivilegesBranch allocated
+//                on heap, otherwise there is copying from a cache entry to
+//                the output parameter.
+Status SentryPrivilegesFetcher::GetSentryPrivileges(
+    SentryAuthorizableScope::Scope requested_scope,
+    const string& table_name,
+    const string& user,
+    SentryPrivilegesBranch* privileges) {
+  // TODO(aserbin): once only requests with scope TABLE are issued,
+  //                uncomment the CHECK_EQ() below.
+  // Don't query Sentry for authz scopes other than 'TABLE'.
+  //CHECK_EQ(SentryAuthorizableScope::TABLE, requested_scope);
+  SentryAuthorizableScope scope(requested_scope);
+  TSentryAuthorizable authorizable;
+  RETURN_NOT_OK(GetAuthorizable(table_name, scope.scope(), &authorizable));
+
+  AuthzInfoKey aggregate_key(user, authorizable);
+  const auto& key = aggregate_key.GetFlattenedKey();
+  typename AuthzInfoCache::EntryHandle handle;
+  if (PREDICT_TRUE(cache_)) {
+    handle = cache_->Get(key);
+  }
+  if (handle) {
+    *privileges = handle.value();
+    return Status::OK();
+  }
+
+  TListSentryPrivilegesResponse response;
+  RETURN_NOT_OK(FetchPrivilegesFromSentry(FLAGS_kudu_service_name,
+                                          user, authorizable, &response));
+  SentryPrivilegesBranch result(authorizable, response);
+  if (PREDICT_FALSE(!cache_)) {
+    *privileges = std::move(result);
+    return Status::OK();
+  }
+
+  // Put the result into the cache.
+  unique_ptr<SentryPrivilegesBranch> result_ptr(
+      new SentryPrivilegesBranch(result));
+  const auto result_footprint = result_ptr->memory_footprint() + key.capacity();
+  cache_->Put(key, std::move(result_ptr), result_footprint);
+  VLOG(2) << Substitute("cached entry of size $0 bytes for key '$1'",
+                        result_footprint, key);
+
+  *privileges = std::move(result);
+  return Status::OK();
+}
+
+// In addition to sanity checking of the contents of TSentryPrivilege in
+// 'privilege', this function has DCHECKs to spot programmer's errors
+// with regard to correctly setting fields of the 'requested_authorizable'
+// parameter.
+bool SentryPrivilegesFetcher::SentryPrivilegeIsWellFormed(
     const TSentryPrivilege& privilege,
     const TSentryAuthorizable& requested_authorizable,
     SentryAuthorizableScope::Scope* scope,
@@ -436,7 +510,8 @@ bool SentryAuthzProvider::SentryPrivilegeIsWellFormed(
     }
   }
   // Make sure that all expected fields are set, and that they match those in
-  // the requested authorizable.
+  // the requested authorizable. Sentry auhtorizables are case-insensitive
+  // due to the properties of Kudu-HMS integration.
   for (const auto& nonempty_field : ExpectedNonEmptyFields(granted_scope.scope())) {
     switch (nonempty_field) {
       case SentryAuthorizableScope::COLUMN:
@@ -474,176 +549,90 @@ bool SentryAuthzProvider::SentryPrivilegeIsWellFormed(
   return true;
 }
 
-namespace {
-
-// Returns a unique string key for the given authorizable, at the given scope.
-// The authorizable must be a well-formed at the given scope.
-string GetKey(const string& server, const string& db, const string& table, const string& column,
-              SentryAuthorizableScope::Scope scope) {
-  DCHECK(!server.empty());
+const AuthorizableScopesSet& SentryPrivilegesFetcher::ExpectedEmptyFields(
+    SentryAuthorizableScope::Scope scope) {
+  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::DATABASE,
+                                                    SentryAuthorizableScope::TABLE,
+                                                    SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::TABLE,
+                                                SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kColumnFields{};
   switch (scope) {
     case SentryAuthorizableScope::SERVER:
-      return server;
+      return kServerFields;
     case SentryAuthorizableScope::DATABASE:
-      DCHECK(!db.empty());
-      return Substitute("$0/$1", server, db);
+      return kDbFields;
     case SentryAuthorizableScope::TABLE:
-      DCHECK(!db.empty() && !table.empty());
-      return Substitute("$0/$1/$2", server, db, table);
+      return kTableFields;
     case SentryAuthorizableScope::COLUMN:
-      DCHECK(!db.empty() && !table.empty() && !column.empty());
-      return Substitute("$0/$1/$2/$3", server, db, table, column);
+      return kColumnFields;
     default:
       LOG(DFATAL) << "not reachable";
   }
-  return "";
+  return kColumnFields;
 }
 
-} // anonymous namespace
-
-Status SentryAuthzProvider::GetSentryPrivileges(SentryAuthorizableScope::Scope scope,
-                                                const string& table_name,
-                                                const string& user,
-                                                SentryPrivilegesBranch* privileges) {
-  TSentryAuthorizable requested_authorizable;
-  RETURN_NOT_OK(GetAuthorizable(table_name, scope, &requested_authorizable));
+const AuthorizableScopesSet& SentryPrivilegesFetcher::ExpectedNonEmptyFields(
+    SentryAuthorizableScope::Scope scope) {
+  static const AuthorizableScopesSet kColumnFields{ SentryAuthorizableScope::SERVER,
+                                                    SentryAuthorizableScope::DATABASE,
+                                                    SentryAuthorizableScope::TABLE,
+                                                    SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::SERVER,
+                                                   SentryAuthorizableScope::DATABASE,
+                                                   SentryAuthorizableScope::TABLE };
+  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::SERVER,
+                                                SentryAuthorizableScope::DATABASE };
+  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::SERVER };
+  switch (scope) {
+    case SentryAuthorizableScope::COLUMN:
+      return kColumnFields;
+    case SentryAuthorizableScope::TABLE:
+      return kTableFields;
+    case SentryAuthorizableScope::DATABASE:
+      return kDbFields;
+    case SentryAuthorizableScope::SERVER:
+      return kServerFields;
+    default:
+      LOG(DFATAL) << "not reachable";
+  }
+  return kColumnFields;
+}
 
+Status SentryPrivilegesFetcher::FetchPrivilegesFromSentry(
+    const string& service_name,
+    const string& user,
+    const TSentryAuthorizable& authorizable,
+    TListSentryPrivilegesResponse* response) {
   TListSentryPrivilegesRequest request;
-  request.__set_requestorUserName(FLAGS_kudu_service_name);
+  request.__set_requestorUserName(service_name);
   request.__set_principalName(user);
-  request.__set_authorizableHierarchy(requested_authorizable);
-  TListSentryPrivilegesResponse response;
-  RETURN_NOT_OK(ha_client_.Execute(
+  request.__set_authorizableHierarchy(authorizable);
+  return sentry_client_.Execute(
       [&] (SentryClient* client) {
-        return client->ListPrivilegesByUser(request, &response);
-      }));
-  unordered_map<string, AuthorizablePrivileges> privileges_map;
-  for (const auto& privilege_resp : response.privileges) {
-    SentryAuthorizableScope::Scope granted_scope;
-    SentryAction::Action granted_action;
-    if (!SentryPrivilegeIsWellFormed(privilege_resp, requested_authorizable,
-                                     &granted_scope, &granted_action)) {
-      if (VLOG_IS_ON(1)) {
-        std::ostringstream os;
-        privilege_resp.printTo(os);
-        VLOG(1) << Substitute("Ignoring privilege response: $0", os.str());
-      }
-      continue;
-    }
-    const auto& db = privilege_resp.dbName;
-    const auto& table = privilege_resp.tableName;
-    const auto& column = privilege_resp.columnName;
-    const string authorizable_key = GetKey(privilege_resp.serverName, db, table, column,
-                                           granted_scope);
-    AuthorizablePrivileges& privilege = LookupOrInsert(&privileges_map, authorizable_key,
-        AuthorizablePrivileges(granted_scope, db, table, column));
-    InsertIfNotPresent(&privilege.granted_privileges, granted_action);
-    if ((granted_action == SentryAction::ALL || granted_action == SentryAction::OWNER) &&
-        privilege_resp.grantOption == TSentryGrantOption::ENABLED) {
-      privilege.all_with_grant = true;
-    }
-  }
-  EmplaceValuesFromMap(std::move(privileges_map), &privileges->privileges);
-  return Status::OK();
+        return client->ListPrivilegesByUser(request, response);
+      });
 }
 
-Status SentryAuthzProvider::FillTablePrivilegePB(const string& table_name,
-                                                 const string& user,
-                                                 const SchemaPB& schema_pb,
-                                                 TablePrivilegePB* pb) {
-  DCHECK(pb);
-  DCHECK(pb->has_table_id());
-  if (AuthzProvider::IsTrustedUser(user)) {
-    pb->set_delete_privilege(true);
-    pb->set_insert_privilege(true);
-    pb->set_scan_privilege(true);
-    pb->set_update_privilege(true);
-    return Status::OK();
-  }
-  static ColumnPrivilegePB scan_col_privilege;
-  scan_col_privilege.set_scan_privilege(true);
-
-  // Note: it might seem like we could cache these TablePrivilegePBs rather
-  // than parsing them from Sentry privileges every time. This is tricky
-  // because the column-level privileges depend on the input schema, which may
-  // be different upon subsequent calls to this function.
-  SentryPrivilegesBranch privileges;
-  RETURN_NOT_OK(GetSentryPrivileges(SentryAuthorizableScope::TABLE, table_name,
-                                    user, &privileges));
-  unordered_set<string> scannable_col_names;
-  static const SentryAuthorizableScope kTableScope(SentryAuthorizableScope::TABLE);
-  for (const auto& privilege : privileges.privileges) {
-    if (SentryAuthorizableScope(privilege.scope).Implies(kTableScope)) {
-      // Pull out any privileges at the table scope or higher.
-      if (ContainsKey(privilege.granted_privileges, SentryAction::ALL) ||
-          ContainsKey(privilege.granted_privileges, SentryAction::OWNER)) {
-        // Generate privilege with everything.
-        pb->set_delete_privilege(true);
-        pb->set_insert_privilege(true);
-        pb->set_scan_privilege(true);
-        pb->set_update_privilege(true);
-        return Status::OK();
-      }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::DELETE)) {
-        pb->set_delete_privilege(true);
-      }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::INSERT)) {
-        pb->set_insert_privilege(true);
-      }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::SELECT)) {
-        pb->set_scan_privilege(true);
-      }
-      if (ContainsKey(privilege.granted_privileges, SentryAction::UPDATE)) {
-        pb->set_update_privilege(true);
-      }
-    } else if (!pb->scan_privilege() &&
-               (ContainsKey(privilege.granted_privileges, SentryAction::ALL) ||
-                ContainsKey(privilege.granted_privileges, SentryAction::OWNER) ||
-                ContainsKey(privilege.granted_privileges, SentryAction::SELECT))) {
-      // Pull out any scan privileges at the column scope.
-      DCHECK_EQ(SentryAuthorizableScope::COLUMN, privilege.scope);
-      DCHECK(!privilege.column_name.empty());
-      EmplaceIfNotPresent(&scannable_col_names, privilege.column_name);
-    }
-  }
-  // If we got any column-level scan privileges and we don't already have
-  // table-level scan privileges, set them now.
-  if (!pb->scan_privilege()) {
-    for (const auto& col : schema_pb.columns()) {
-      if (ContainsKey(scannable_col_names, col.name())) {
-        InsertIfNotPresent(pb->mutable_column_privileges(), col.id(), scan_col_privilege);
-      }
+Status SentryPrivilegesFetcher::ResetCache() {
+  const auto cache_capacity_bytes =
+      FLAGS_sentry_privileges_cache_capacity_mb * 1024 * 1024;
+  if (cache_capacity_bytes == 0) {
+    cache_.reset();
+  } else {
+    const auto ttl_sec = FLAGS_authz_token_validity_seconds *
+        FLAGS_sentry_privileges_cache_ttl_factor;
+    cache_.reset(new AuthzInfoCache(cache_capacity_bytes,
+                                    MonoDelta::FromSeconds(ttl_sec)));
+    if (metric_entity_) {
+      unique_ptr<SentryPrivilegesCacheMetrics> metrics(
+          new SentryPrivilegesCacheMetrics(metric_entity_));
+      cache_->SetMetrics(std::move(metrics));
     }
   }
   return Status::OK();
 }
 
-Status SentryAuthzProvider::Authorize(SentryAuthorizableScope::Scope scope,
-                                      SentryAction::Action action,
-                                      const string& table_ident,
-                                      const string& user,
-                                      bool require_grant_option) {
-  if (AuthzProvider::IsTrustedUser(user)) {
-    return Status::OK();
-  }
-
-  SentryPrivilegesBranch privileges;
-  RETURN_NOT_OK(GetSentryPrivileges(scope, table_ident, user, &privileges));
-  if (privileges.Implies(scope, action, require_grant_option)) {
-    return Status::OK();
-  }
-
-  // Log a warning if the action is not authorized for debugging purpose, and
-  // only return a generic error to users to avoid a side channel leak, e.g.
-  // whether table A exists.
-  LOG(WARNING) << Substitute("Action <$0> on table <$1> with authorizable scope "
-                             "<$2> is not permitted for user <$3>",
-                             sentry::ActionToString(action),
-                             table_ident,
-                             sentry::ScopeToString(scope),
-                             user);
-  return Status::NotAuthorized("unauthorized action");
-}
-
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/sentry_authz_provider.h b/src/kudu/master/sentry_privileges_fetcher.h
similarity index 52%
copy from src/kudu/master/sentry_authz_provider.h
copy to src/kudu/master/sentry_privileges_fetcher.h
index 1a1efa2..23ef6b4 100644
--- a/src/kudu/master/sentry_authz_provider.h
+++ b/src/kudu/master/sentry_privileges_fetcher.h
@@ -17,50 +17,51 @@
 
 #pragma once
 
+#include <cstddef>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/master/authz_provider.h"
 #include "kudu/sentry/sentry_action.h"
 #include "kudu/sentry/sentry_authorizable_scope.h"
 #include "kudu/sentry/sentry_client.h"
 #include "kudu/thrift/client.h"
+#include "kudu/util/bitset.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
+#include "kudu/util/ttl_cache.h"
 
 namespace sentry {
+class TListSentryPrivilegesResponse;
 class TSentryAuthorizable;
 class TSentryPrivilege;
-} // namespace sentry
+}  // namespace sentry
 
 namespace kudu {
-
-class SchemaPB;
-
-namespace security {
-class TablePrivilegePB;
-} // namespace security
-
 namespace master {
 
 // Utility struct to facilitate evaluating the privileges of a given
 // authorizable. This is preferred to using Sentry's Thrift responses directly,
 // since useful information has already been parsed to generate this struct
 // (e.g. the SentryActions and scope).
+// The 'server' field is omitted: everything is implicitly bound to a particular
+// Sentry instance which is the only authoritative source of authz information
+// for Kudu in the current model of AuthzProvider.
 struct AuthorizablePrivileges {
   AuthorizablePrivileges(sentry::SentryAuthorizableScope::Scope scope,
                          std::string db,
                          std::string table,
                          std::string column)
-    : scope(scope),
+    : all_with_grant(false),
+      scope(scope),
       db_name(std::move(db)),
       table_name(std::move(table)),
       column_name(std::move(column)) {
@@ -86,13 +87,13 @@ struct AuthorizablePrivileges {
   // Whether the privilege 'ALL' or 'OWNER' has been granted with Sentry's
   // grant option enabled. Note that the grant option can be granted on any
   // action, but for Kudu, we only use it with 'ALL' or 'OWNER'.
-  bool all_with_grant = false;
+  bool all_with_grant;
 
-  // The scope of the authorizable being granted privileges.
-  const sentry::SentryAuthorizableScope::Scope scope;
+  // The scope of the authorizable being granted the privileges.
+  sentry::SentryAuthorizableScope::Scope scope;
 
-  // The set of actions on which privileges are granted.
-  sentry::SentryActionsSet granted_privileges;
+  // The set of actions for which privileges are granted.
+  sentry::SentryActionsSet allowed_actions;
 
   // The fields of the authorizable.
   std::string db_name;
@@ -101,72 +102,68 @@ struct AuthorizablePrivileges {
 };
 
 // A representation of the Sentry privilege hierarchy branch for a single table
-// (including privileges for the table's ancestors and descendents) for a
-// single user.
-struct SentryPrivilegesBranch {
-  // Set of privileges are granted.
-  std::vector<AuthorizablePrivileges> privileges;
-
-  // Returns whether or not this implies the given action and scope for the
-  // given table.
-  bool Implies(sentry::SentryAuthorizableScope::Scope required_scope,
-               sentry::SentryAction::Action required_action,
-               bool requires_all_with_grant) const;
-};
-
-// An implementation of AuthzProvider that connects to the Sentry service
-// for authorization metadata and allows or denies the actions performed by
-// users based on the metadata.
-//
-// This class is thread-safe after Start() is called.
-class SentryAuthzProvider : public AuthzProvider {
+// (including privileges for the table's ancestors and descendents in the
+//  authz scope hierarchy) for a single user.
+class SentryPrivilegesBranch {
  public:
-  explicit SentryAuthzProvider(scoped_refptr<MetricEntity> metric_entity = {});
-
-  ~SentryAuthzProvider();
+  // Construct an empty instance: no information on privileges.
+  SentryPrivilegesBranch() = default;
 
-  // Start SentryAuthzProvider instance which connects to the Sentry service.
-  Status Start() override WARN_UNUSED_RESULT;
+  // Construct an instance for the specified 'authorizable' from 'response'.
+  SentryPrivilegesBranch(
+      const ::sentry::TSentryAuthorizable& authorizable,
+      const ::sentry::TListSentryPrivilegesResponse& response);
 
-  void Stop() override;
-
-  // Returns true if the SentryAuthzProvider should be enabled.
-  static bool IsEnabled();
+  // Accessor to the privileges information stored in the object.
+  const std::vector<AuthorizablePrivileges>& privileges() const {
+    return privileges_;
+  }
 
-  // The following authorizing methods will fail if:
-  //   - the operation is not authorized
-  //   - the Sentry service is unreachable
-  //   - Sentry fails to resolve the group mapping of the user
-  //   - the specified '--kudu_service_name' is a non-admin user in Sentry
-  // TODO(hao): add early failure recognition when SENTRY-2440 is done.
+  // Get estimation on amount of memory used (in bytes) to store this instance.
+  size_t memory_footprint() const;
 
-  Status AuthorizeCreateTable(const std::string& table_name,
-                              const std::string& user,
-                              const std::string& owner) override WARN_UNUSED_RESULT;
+ private:
+  // Utility function.
+  void DoInit(const ::sentry::TSentryAuthorizable& authorizable,
+              const ::sentry::TListSentryPrivilegesResponse& response);
 
-  Status AuthorizeDropTable(const std::string& table_name,
-                            const std::string& user) override WARN_UNUSED_RESULT;
+  // Set of privileges are granted.
+  std::vector<AuthorizablePrivileges> privileges_;
+};
 
-  // Note that the caller should normalize the table name if case sensitivity is not
-  // enforced for naming. e.g. when HMS integration is enabled.
-  Status AuthorizeAlterTable(const std::string& old_table,
-                             const std::string& new_table,
-                             const std::string& user) override WARN_UNUSED_RESULT;
+// A utility class to use in SentryAuthzProvider. This class provides an
+// interface for finding privileges granted to a user at some authz scope.
+// The authoritative source of the authz privileges information is Sentry,
+// where the Sentry-related parameters are specified via command line flags for
+// kudu-master binary (see the .cc file for available command line flags).
+//
+// Optionally, the fetcher can use TTL-based cache to store information
+// retrieved from Sentry, making it possible to reuse once fetched information
+// until corresponding cache entries expire.
+class SentryPrivilegesFetcher {
+ public:
+  explicit SentryPrivilegesFetcher(scoped_refptr<MetricEntity> metric_entity);
+  ~SentryPrivilegesFetcher() = default;
 
-  Status AuthorizeGetTableMetadata(const std::string& table_name,
-                                   const std::string& user) override WARN_UNUSED_RESULT;
+  // Start/stop the underlying Sentry client.
+  Status Start();
+  void Stop();
 
-  Status FillTablePrivilegePB(const std::string& table_name,
-                              const std::string& user,
-                              const SchemaPB& schema_pb,
-                              security::TablePrivilegePB* pb) override WARN_UNUSED_RESULT;
+  // Fetches the user's privileges from Sentry for the authorizable specified
+  // by the given table and scope. The result privileges might be served
+  // from the cache, if caching is enabled and corresponding entry exists
+  // in the cache.
+  Status GetSentryPrivileges(
+      sentry::SentryAuthorizableScope::Scope requested_scope,
+      const std::string& table_name,
+      const std::string& user,
+      SentryPrivilegesBranch* privileges);
 
  private:
   friend class SentryAuthzProviderFilterPrivilegesTest;
-  FRIEND_TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed);
-  FRIEND_TEST(TestAuthzHierarchy, TestAuthorizableScope);
-  FRIEND_TEST(SentryAuthzProviderFilterPrivilegesScopeTest, TestFilterInvalidResponses);
-  FRIEND_TEST(SentryAuthzProviderFilterPrivilegesScopeTest, TestFilterValidResponses);
+  friend class SentryAuthzProviderTest;
+  friend class SentryPrivilegesBranch;
+  FRIEND_TEST(SentryPrivilegesFetcherStaticTest, TestPrivilegesWellFormed);
 
   // Utility function to determine whether the given privilege is a well-formed
   // possibly Kudu-related privilege describing a descendent or ancestor of the
@@ -194,29 +191,35 @@ class SentryAuthzProvider : public AuthzProvider {
   static const sentry::AuthorizableScopesSet& ExpectedEmptyFields(
       sentry::SentryAuthorizableScope::Scope scope);
 
-  // Fetches the user's privileges from Sentry for the authorizable specified
-  // by the given table and scope.
-  Status GetSentryPrivileges(sentry::SentryAuthorizableScope::Scope scope,
-                             const std::string& table_name,
-                             const std::string& user,
-                             SentryPrivilegesBranch* privileges);
-
-  // Checks if the user can perform an action on the table identifier (in the format
-  // <database-name>.<table-name>), based on the given authorizable scope and the
-  // grant option. Note that the authorizable scope should be equal or higher than
-  // 'TABLE' scope.
+  // Sends a request to fetch privileges from Sentry for the given authorizable.
+  Status FetchPrivilegesFromSentry(
+      const std::string& service_name,
+      const std::string& user,
+      const ::sentry::TSentryAuthorizable& authorizable,
+      ::sentry::TListSentryPrivilegesResponse* response);
+
+  // Resets the authz cache. In addition to lifecycle-related methods like
+  // Start(), this method is also used by tests: if the authz information
+  // has been updated by the test, the cache needs to be invalidated.
   //
-  // If the operation is not authorized, returns Status::NotAuthorized().
-  // Note that the authorization process is case insensitive for the
-  // authorizables.
-  Status Authorize(sentry::SentryAuthorizableScope::Scope scope,
-                   sentry::SentryAction::Action action,
-                   const std::string& table_ident,
-                   const std::string& user,
-                   bool require_grant_option = false);
+  // NOTE: this method is not thread-safe and should not be called along with
+  //       concurrent authz requests.
+  Status ResetCache();
 
+  // Metric entity for registering metric gauges/counters.
   scoped_refptr<MetricEntity> metric_entity_;
-  thrift::HaClient<sentry::SentryClient> ha_client_;
+
+  // The authz scope hierarchy level that defines the narrowest authz scope
+  // for requests sent to Sentry. If not set, no broadening of authz privilege
+  // scope is made.
+  boost::optional<sentry::SentryAuthorizableScope> scope_depth_limit_;
+
+  // Client instance to communicate with Sentry.
+  thrift::HaClient<sentry::SentryClient> sentry_client_;
+
+  // The TTL cache to store information on privileges received from Sentry.
+  typedef TTLCache<std::string, SentryPrivilegesBranch> AuthzInfoCache;
+  std::unique_ptr<AuthzInfoCache> cache_;
 };
 
 } // namespace master
diff --git a/src/kudu/sentry/sentry_authorizable_scope.cc b/src/kudu/sentry/sentry_authorizable_scope.cc
index deac923..758666d 100644
--- a/src/kudu/sentry/sentry_authorizable_scope.cc
+++ b/src/kudu/sentry/sentry_authorizable_scope.cc
@@ -18,7 +18,6 @@
 #include "kudu/sentry/sentry_authorizable_scope.h"
 
 #include <cstdint>
-
 #include <ostream>
 #include <string>
 
@@ -36,7 +35,7 @@ namespace sentry {
 const char* ScopeToString(SentryAuthorizableScope::Scope scope) {
   switch (scope) {
     case SentryAuthorizableScope::Scope::UNINITIALIZED: return "UNINITIALIZED";
-    case SentryAuthorizableScope::Scope::SERVER: return kSever;
+    case SentryAuthorizableScope::Scope::SERVER: return kServer;
     case SentryAuthorizableScope::Scope::DATABASE: return kDatabase;
     case SentryAuthorizableScope::Scope::TABLE: return kTable;
     case SentryAuthorizableScope::Scope::COLUMN: return kColumn;
@@ -59,7 +58,7 @@ SentryAuthorizableScope::SentryAuthorizableScope(Scope scope)
 
 Status SentryAuthorizableScope::FromString(const string& str,
                                            SentryAuthorizableScope* scope) {
-  if (boost::iequals(str, kSever)) {
+  if (boost::iequals(str, kServer)) {
     scope->scope_ = Scope::SERVER;
   } else if (boost::iequals(str, kDatabase)) {
     scope->scope_ = Scope::DATABASE;
diff --git a/src/kudu/sentry/sentry_authorizable_scope.h b/src/kudu/sentry/sentry_authorizable_scope.h
index b0f6edc..f7d6dc8 100644
--- a/src/kudu/sentry/sentry_authorizable_scope.h
+++ b/src/kudu/sentry/sentry_authorizable_scope.h
@@ -17,10 +17,10 @@
 
 #pragma once
 
+#include <cstddef>
 #include <iosfwd>
 #include <string>
 
-#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/util/bitset.h"
 #include "kudu/util/status.h"
@@ -73,7 +73,7 @@ class SentryAuthorizableScope {
   Scope scope_;
 };
 
-static constexpr const char* const kSever = "SERVER";
+static constexpr const char* const kServer = "SERVER";
 static constexpr const char* const kDatabase = "DATABASE";
 static constexpr const char* const kTable = "TABLE";
 static constexpr const char* const kColumn = "COLUMN";