You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/04/05 01:50:12 UTC

[kudu] branch master updated: sentry: sanitize and parse privileges from Sentry

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a93f98  sentry: sanitize and parse privileges from Sentry
2a93f98 is described below

commit 2a93f98d71bedf045b7e4a1800e047f8498dee38
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Tue Apr 2 19:46:54 2019 -0700

    sentry: sanitize and parse privileges from Sentry
    
    Currently, we pass around the Thrift privileges received from Sentry,
    which can be both expensive memory-wise and cumbersome to use. This
    patch:
    - sanitizes the responses from Sentry, only keeping those that are
      well-formed and potentially Kudu-related,
    - stores them in a more ergonomic form, e.g. keeping around enums rather
      than strings for SentryActions, etc. This form may be updated in the
      future to facilitate privilege evaluation -- for now, my goal is just
      to make it easier to work with Sentry privileges,
    - encapsulates the above in an abstracted version of a Sentry response
      that corresponds to the hierarchy tree for a given table, with the
      hope that it will make changing the in-memory format more painless,
    - switches the SentryAuthorizableScope and SentryAction enum classes to
      enums, to avoid having to use the extra enum class typename
      everywhere (e.g. now SentryAuthorizableScope::SERVER instead of
      SentryAuthorizableScope::Scope::SERVER will suffice),
    - tests that the sanitization does what it purports to do,
    - tests authorizing "create tables" with OWNER privileges, due to an
      issue caught in review.
    
    Change-Id: Ib6de6814f99abfbee4f030298b74f21f4e7c729b
    Reviewed-on: http://gerrit.cloudera.org:8080/12919
    Tested-by: Kudu Jenkins
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/gutil/map-util.h                     |  22 +-
 src/kudu/master/sentry_authz_provider-test.cc | 319 ++++++++++++++++++++++++-
 src/kudu/master/sentry_authz_provider.cc      | 332 +++++++++++++++++++++-----
 src/kudu/master/sentry_authz_provider.h       | 110 +++++++++
 src/kudu/sentry/sentry_action.h               |   7 +-
 src/kudu/sentry/sentry_authorizable_scope.h   |   8 +-
 6 files changed, 714 insertions(+), 84 deletions(-)

diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index 212a078..1876547 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -829,6 +829,19 @@ void AppendValuesFromMap(const MapContainer& map_container,
   }
 }
 
+template <class MapContainer, class ValueContainer>
+void EmplaceValuesFromMap(MapContainer&& map_container,
+                          ValueContainer* value_container) {
+  CHECK(value_container != nullptr);
+  // See AppendKeysFromMap for why this is done.
+  if (value_container->empty()) {
+    value_container->reserve(map_container.size());
+  }
+  for (auto&& entry : map_container) {
+    value_container->emplace_back(std::move(entry.second));
+  }
+}
+
 // A more specialized overload of AppendValuesFromMap to optimize reallocations
 // for the common case in which we're appending values to a vector and hence
 // can (and sometimes should) call reserve() first.
@@ -839,14 +852,7 @@ void AppendValuesFromMap(const MapContainer& map_container,
 template <class MapContainer, class ValueType>
 void AppendValuesFromMap(const MapContainer& map_container,
                          std::vector<ValueType>* value_container) {
-  CHECK(value_container != NULL);
-  // See AppendKeysFromMap for why this is done.
-  if (value_container->empty()) {
-    value_container->reserve(map_container.size());
-  }
-  for (const auto& entry : map_container) {
-    value_container->push_back(entry.second);
-  }
+  EmplaceValuesFromMap(map_container, value_container);
 }
 
 // Compute and insert new value if it's absent from the map. Return a pair with a reference to the
diff --git a/src/kudu/master/sentry_authz_provider-test.cc b/src/kudu/master/sentry_authz_provider-test.cc
index 35199a6..04aff81 100644
--- a/src/kudu/master/sentry_authz_provider-test.cc
+++ b/src/kudu/master/sentry_authz_provider-test.cc
@@ -18,13 +18,16 @@
 #include "kudu/master/sentry_authz_provider.h"
 
 #include <memory>
+#include <ostream>
 #include <string>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/sentry_authz_provider-test-base.h"
 #include "kudu/sentry/mini_sentry.h"
@@ -34,6 +37,8 @@
 #include "kudu/sentry/sentry_client.h"
 #include "kudu/sentry/sentry_policy_service_types.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -45,17 +50,20 @@ DECLARE_string(sentry_service_security_mode);
 DECLARE_string(server_name);
 DECLARE_string(trusted_user_acl);
 
+using sentry::TSentryAuthorizable;
 using sentry::TSentryGrantOption;
 using sentry::TSentryPrivilege;
+using std::string;
 using std::tuple;
 using std::unique_ptr;
-using std::string;
 using std::vector;
 using strings::Substitute;
 
 namespace kudu {
 
+using sentry::AuthorizableScopesSet;
 using sentry::SentryAction;
+using sentry::SentryActionsSet;
 using sentry::SentryTestBase;
 using sentry::SentryAuthorizableScope;
 
@@ -70,6 +78,70 @@ TEST(SentryAuthzProviderStaticTest, TestTrustedUserAcl) {
   ASSERT_FALSE(authz_provider.IsTrustedUser("untrusted"));
 }
 
+// Basic unit test for validations on ill-formed privileges.
+TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed) {
+  const string kDb = "db";
+  const string kTable = "table";
+  TSentryAuthorizable requested_authorizable;
+  requested_authorizable.__set_server(FLAGS_server_name);
+  requested_authorizable.__set_db(kDb);
+  requested_authorizable.__set_table(kTable);
+  TSentryPrivilege real_privilege = GetTablePrivilege(kDb, kTable, "ALL");
+  {
+    // Privilege with a bogus action set.
+    TSentryPrivilege privilege = real_privilege;
+    privilege.__set_action("NotAnAction");
+    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
+  }
+  {
+    // Privilege with a bogus authorizable scope set.
+    TSentryPrivilege privilege = real_privilege;
+    privilege.__set_privilegeScope("NotAnAuthorizableScope");
+    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
+  }
+  {
+    // Privilege with a valid, but unexpected scope for the set fields.
+    TSentryPrivilege privilege = real_privilege;
+    privilege.__set_privilegeScope("COLUMN");
+    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
+  }
+  {
+    // Privilege with a messed up scope field at a higher scope than that
+    // requested.
+    TSentryPrivilege privilege = real_privilege;
+    privilege.__set_dbName("NotTheActualDb");
+    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
+  }
+  {
+    // Privilege with an field set that isn't meant to be set at its scope.
+    TSentryPrivilege privilege = real_privilege;
+    privilege.__set_columnName("SomeColumn");
+    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
+  }
+  {
+    // Privilege with a missing field for its scope.
+    TSentryPrivilege privilege = real_privilege;
+    privilege.__isset.tableName = false;
+    ASSERT_FALSE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        privilege, requested_authorizable, /*scope=*/nullptr, /*action=*/nullptr));
+  }
+  {
+    // Finally, the correct table-level privilege.
+    SentryAuthorizableScope::Scope granted_scope;
+    SentryAction::Action granted_action;
+    real_privilege.printTo(LOG(INFO));
+    ASSERT_TRUE(SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+        real_privilege, requested_authorizable, &granted_scope, &granted_action));
+    ASSERT_EQ(SentryAuthorizableScope::TABLE, granted_scope);
+    ASSERT_EQ(SentryAction::ALL, granted_action);
+  }
+}
+
 class SentryAuthzProviderTest : public SentryTestBase {
  public:
   const char* const kTestUser = "test-user";
@@ -98,23 +170,232 @@ class SentryAuthzProviderTest : public SentryTestBase {
     return Status::OK();
   }
 
+  bool KerberosEnabled() const override {
+    return false;
+  }
+
  protected:
   unique_ptr<SentryAuthzProvider> sentry_authz_provider_;
 };
 
-// Tests to ensure SentryAuthzProvider enforces access control on tables as expected.
-// Parameterized by whether Kerberos should be enabled.
-class TestTableAuthorization : public SentryAuthzProviderTest,
-                               public ::testing::WithParamInterface<bool> {
+namespace {
+
+// Indicates different invalid privilege response types to be injected.
+enum class InvalidPrivilege {
+  // No error is injected.
+  NONE,
+
+  // The action string is set to something other than the expected action.
+  INCORRECT_ACTION,
+
+  // The scope string is set to something other than the expected scope.
+  INCORRECT_SCOPE,
+
+  // The 'serverName' field is set to something other than the authorizable's
+  // server name. Why just the server? This guarantees that a request for any
+  // authorizable scope will ignore such invalid privileges. E.g. say we
+  // instead granted an incorrect 'tableName'; assuming the dbName were still
+  // correct, a request at the database scope would correctly _not_ ignore the
+  // privilege. So to ensure that these InvalidPrivileges always yield
+  // privileges that are ignored, we exclusively butcher the 'server' field.
+  INCORRECT_SERVER,
+
+  // One of the scope fields (e.g. serverName, dbName, etc.) is unexpectedly
+  // missing or unexpectedly set. Note: Sentry servers don't allow an empty
+  // 'server' scope; if erasing the 'server' field, we'll instead set it to
+  // something other than the expected server.
+  FLIPPED_FIELD,
+};
+
+const SentryActionsSet kAllActions({
+  SentryAction::ALL,
+  SentryAction::METADATA,
+  SentryAction::SELECT,
+  SentryAction::INSERT,
+  SentryAction::UPDATE,
+  SentryAction::DELETE,
+  SentryAction::ALTER,
+  SentryAction::CREATE,
+  SentryAction::DROP,
+  SentryAction::OWNER,
+});
+
+constexpr const char* kDb = "db";
+constexpr const char* kTable = "table";
+constexpr const char* kColumn = "column";
+
+} // anonymous namespace
+
+class SentryAuthzProviderFilterResponsesTest :
+    public SentryAuthzProviderTest,
+    public ::testing::WithParamInterface<SentryAuthorizableScope::Scope> {
  public:
-  bool KerberosEnabled() const {
-    return GetParam();
+  SentryAuthzProviderFilterResponsesTest()
+      : prng_(SeedRandom()) {}
+
+  void SetUp() override {
+    SentryAuthzProviderTest::SetUp();
+    ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
+    full_authorizable_.server = FLAGS_server_name;
+    full_authorizable_.db = kDb;
+    full_authorizable_.table = kTable;
+    full_authorizable_.column = kColumn;
+  }
+
+  bool KerberosEnabled() const override {
+    return true;
   }
+
+  // Creates a Sentry privilege for the user based on the given action,
+  // the given scope, and the given authorizable that has all scope fields set.
+  // With all of the scope fields set in the authorizable, and a given scope,
+  // we can return an appropriate privilege for it, with tweaks indicated by
+  // 'invalid_privilege' to make the privilege invalid if desired.
+  TSentryPrivilege CreatePrivilege(const TSentryAuthorizable& full_authorizable,
+                                   const SentryAuthorizableScope& scope, const SentryAction& action,
+                                   InvalidPrivilege invalid_privilege = InvalidPrivilege::NONE) {
+    DCHECK(!full_authorizable.server.empty() && !full_authorizable.db.empty() &&
+          !full_authorizable.table.empty() && !full_authorizable.column.empty());
+    TSentryPrivilege privilege;
+    privilege.__set_action(invalid_privilege == InvalidPrivilege::INCORRECT_ACTION ?
+                           "foobar" : ActionToString(action.action()));
+    privilege.__set_privilegeScope(invalid_privilege == InvalidPrivilege::INCORRECT_SCOPE ?
+                                   "foobar" : ScopeToString(scope.scope()));
+
+    // Select a scope at which we'll mess up the privilege request's field.
+    AuthorizableScopesSet nonempty_fields =
+        SentryAuthzProvider::ExpectedNonEmptyFields(scope.scope());
+    if (invalid_privilege == InvalidPrivilege::FLIPPED_FIELD) {
+      static const AuthorizableScopesSet kMessUpCandidates = {
+        SentryAuthorizableScope::SERVER,
+        SentryAuthorizableScope::DATABASE,
+        SentryAuthorizableScope::TABLE,
+        SentryAuthorizableScope::COLUMN,
+      };
+      SentryAuthorizableScope::Scope field_to_mess_up =
+          SelectRandomElement<AuthorizableScopesSet, SentryAuthorizableScope::Scope, Random>(
+              kMessUpCandidates, &prng_);
+      if (ContainsKey(nonempty_fields, field_to_mess_up)) {
+        // Since Sentry servers don't allow empty 'server' fields in requests,
+        // rather flipping the empty status of the field, inject an incorrect
+        // value for the field.
+        if (field_to_mess_up == SentryAuthorizableScope::SERVER) {
+          invalid_privilege = InvalidPrivilege::INCORRECT_SERVER;
+        } else {
+          nonempty_fields.erase(field_to_mess_up);
+        }
+      } else {
+        InsertOrDie(&nonempty_fields, field_to_mess_up);
+      }
+    }
+
+    // Fill in any fields we may need.
+    for (const auto& field : nonempty_fields) {
+      switch (field) {
+        case SentryAuthorizableScope::SERVER:
+          privilege.__set_serverName(invalid_privilege == InvalidPrivilege::INCORRECT_SERVER ?
+                                     "foobar" : full_authorizable.server);
+          break;
+        case SentryAuthorizableScope::DATABASE:
+          privilege.__set_dbName(full_authorizable.db);
+          break;
+        case SentryAuthorizableScope::TABLE:
+          privilege.__set_tableName(full_authorizable.table);
+          break;
+        case SentryAuthorizableScope::COLUMN:
+          privilege.__set_columnName(full_authorizable.column);
+          break;
+        default:
+          LOG(FATAL) << "not a valid scope field: " << field;
+      }
+    }
+    return privilege;
+  }
+ protected:
+  // Authorizable that has all scope fields set; useful for generating
+  // privilege requests.
+  TSentryAuthorizable full_authorizable_;
+
+ private:
+  mutable Random prng_;
 };
 
-INSTANTIATE_TEST_CASE_P(KerberosEnabled, TestTableAuthorization, ::testing::Bool());
+// Attempst to grant privileges for various actions on a single scope of an
+// authorizable, injecting various invalid privileges, and checking that Kudu
+// ignores them.
+TEST_P(SentryAuthzProviderFilterResponsesTest, TestFilterInvalidResponses) {
+  const string& table_ident = Substitute("$0.$1", full_authorizable_.db, full_authorizable_.table);
+  static constexpr InvalidPrivilege kInvalidPrivileges[] = {
+      InvalidPrivilege::INCORRECT_ACTION,
+      InvalidPrivilege::INCORRECT_SCOPE,
+      InvalidPrivilege::INCORRECT_SERVER,
+      InvalidPrivilege::FLIPPED_FIELD,
+  };
+  SentryAuthorizableScope granted_scope(GetParam());
+  for (const auto& action : kAllActions) {
+    for (const auto& ip : kInvalidPrivileges) {
+      TSentryPrivilege privilege = CreatePrivilege(full_authorizable_, granted_scope,
+                                                   SentryAction(action), ip);
+      ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+    }
+  }
+  for (const auto& requested_scope : { SentryAuthorizableScope::SERVER,
+                                       SentryAuthorizableScope::DATABASE,
+                                       SentryAuthorizableScope::TABLE }) {
+    SentryPrivilegesBranch privileges;
+    ASSERT_OK(sentry_authz_provider_->GetSentryPrivileges(
+        requested_scope, table_ident, kTestUser, &privileges));
+    // Kudu should ignore all of the invalid privileges.
+    ASSERT_TRUE(privileges.privileges.empty());
+  }
+}
 
-TEST_P(TestTableAuthorization, TestAuthorizeCreateTable) {
+// Grants privileges for various actions on a single scope of an authorizable.
+TEST_P(SentryAuthzProviderFilterResponsesTest, TestFilterValidResponses) {
+  const string& table_ident = Substitute("$0.$1", full_authorizable_.db, full_authorizable_.table);
+  SentryAuthorizableScope granted_scope(GetParam());
+  // Send valid requests and verify that we can get it back through the
+  // SentryAuthzProvider.
+  for (const auto& action : kAllActions) {
+    TSentryPrivilege privilege = CreatePrivilege(full_authorizable_, granted_scope,
+                                                 SentryAction(action));
+    ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
+  }
+  for (const auto& requested_scope : { SentryAuthorizableScope::SERVER,
+                                       SentryAuthorizableScope::DATABASE,
+                                       SentryAuthorizableScope::TABLE }) {
+    SentryPrivilegesBranch privileges;
+    ASSERT_OK(sentry_authz_provider_->GetSentryPrivileges(
+        requested_scope, table_ident, kTestUser, &privileges));
+    ASSERT_EQ(1, privileges.privileges.size());
+    const auto& authorizable_privileges = privileges.privileges[0];
+    ASSERT_EQ(GetParam(), authorizable_privileges.scope)
+        << ScopeToString(authorizable_privileges.scope);
+    ASSERT_FALSE(authorizable_privileges.granted_privileges.empty());
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(GrantedScopes, SentryAuthzProviderFilterResponsesTest,
+                        ::testing::Values(SentryAuthorizableScope::SERVER,
+                                          SentryAuthorizableScope::DATABASE,
+                                          SentryAuthorizableScope::TABLE,
+                                          SentryAuthorizableScope::COLUMN));
+
+
+
+// Test to create tables requiring ALL ON DATABASE with the grant option. This
+// is parameterized on the ALL scope and OWNER actions, which behave
+// identically, and whether Kerberos should be enabled.
+class CreateTableAuthorizationTest : public SentryAuthzProviderTest,
+                                     public ::testing::WithParamInterface<
+                                     std::tuple<string, bool>> {
+ public:
+  bool KerberosEnabled() const override {
+    return std::get<1>(GetParam());
+  }
+};
+
+TEST_P(CreateTableAuthorizationTest, TestAuthorizeCreateTable) {
   // Don't authorize create table on a non-existent user.
   Status s = sentry_authz_provider_->AuthorizeCreateTable("db.table",
                                                           "non-existent-user",
@@ -141,14 +422,30 @@ TEST_P(TestTableAuthorization, TestAuthorizeCreateTable) {
   // requires the creating user have 'ALL on DATABASE' with grant.
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user");
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  privilege = GetDatabasePrivilege("db", "ALL");
+
+  const auto& all = std::get<0>(GetParam());
+  privilege = GetDatabasePrivilege("db", all);
   s = sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user");
   ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
-  privilege = GetDatabasePrivilege("db", "ALL", TSentryGrantOption::ENABLED);
+  privilege = GetDatabasePrivilege("db", all, TSentryGrantOption::ENABLED);
   ASSERT_OK(AlterRoleGrantPrivilege(sentry_client_.get(), kRoleName, privilege));
   ASSERT_OK(sentry_authz_provider_->AuthorizeCreateTable("db.table", kTestUser, "diff-user"));
 }
 
+INSTANTIATE_TEST_CASE_P(AllOrOwnerWithKerberos, CreateTableAuthorizationTest,
+    ::testing::Combine(::testing::Values("ALL", "OWNER"), ::testing::Bool()));
+
+// Tests to ensure SentryAuthzProvider enforces access control on tables as expected.
+// Parameterized by whether Kerberos should be enabled.
+class TestTableAuthorization : public SentryAuthzProviderTest,
+                               public ::testing::WithParamInterface<bool> {
+ public:
+  bool KerberosEnabled() const override {
+    return GetParam();
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(KerberosEnabled, TestTableAuthorization, ::testing::Bool());
 TEST_P(TestTableAuthorization, TestAuthorizeDropTable) {
   // Don't authorize delete table on a user without required privileges.
   ASSERT_OK(CreateRoleAndAddToGroups(sentry_client_.get(), kRoleName, kUserGroup));
diff --git a/src/kudu/master/sentry_authz_provider.cc b/src/kudu/master/sentry_authz_provider.cc
index 7d1a3be..a2f5840 100644
--- a/src/kudu/master/sentry_authz_provider.cc
+++ b/src/kudu/master/sentry_authz_provider.cc
@@ -18,6 +18,8 @@
 #include "kudu/master/sentry_authz_provider.h"
 
 #include <ostream>
+#include <type_traits>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -27,6 +29,7 @@
 
 #include "kudu/common/table_util.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/sentry/sentry_action.h"
 #include "kudu/sentry/sentry_client.h"
@@ -41,7 +44,9 @@ using sentry::TListSentryPrivilegesRequest;
 using sentry::TListSentryPrivilegesResponse;
 using sentry::TSentryAuthorizable;
 using sentry::TSentryGrantOption;
+using sentry::TSentryPrivilege;
 using std::string;
+using std::unordered_map;
 using std::vector;
 
 DEFINE_string(sentry_service_rpc_addresses, "",
@@ -116,8 +121,9 @@ using strings::Substitute;
 namespace kudu {
 
 using sentry::SentryAction;
-using sentry::SentryClient;
 using sentry::SentryAuthorizableScope;
+using sentry::AuthorizableScopesSet;
+using sentry::SentryClient;
 
 namespace master {
 
@@ -134,6 +140,49 @@ static bool ValidateAddresses(const char* flag_name, const string& addresses) {
 }
 DEFINE_validator(sentry_service_rpc_addresses, &ValidateAddresses);
 
+bool SentryPrivilegesBranch::Implies(SentryAuthorizableScope::Scope required_scope,
+                                     SentryAction::Action required_action,
+                                     bool requires_all_with_grant) const {
+  // In general, a privilege implies another when:
+  // 1. the authorizable from the former implies the authorizable from the latter
+  //    (authorizable with a higher scope on the hierarchy can imply authorizables
+  //    with a lower scope on the hierarchy, but not vice versa), and
+  // 2. the action from the former implies the action from the latter, and
+  // 3. grant option from the former implies the grant option from the latter.
+  //
+  // See org.apache.sentry.policy.common.CommonPrivilege. Note that policy validation
+  // in CommonPrivilege also allows wildcard authorizable matching. For example,
+  // authorizable 'server=server1->db=*' can imply authorizable 'server=server1'.
+  // However, wildcard authorizable granting is neither practical nor useful (semantics
+  // of granting such privilege are not supported in Apache Hive, Impala and Hue. And
+  // 'server=server1->db=*' has exactly the same meaning as 'server=server1'). Therefore,
+  // wildcard authorizable matching is dropped in this implementation.
+  //
+  // Moreover, because ListPrivilegesByUser lists all Sentry privileges granted to the
+  // user that match the authorizable of each scope in the input authorizable hierarchy,
+  // privileges with lower scope will also be returned in the response. This contradicts
+  // rule (1) mentioned above. Therefore, we need to validate privilege scope, in addition
+  // to action and grant option. Otherwise, privilege escalation can happen.
+  SentryAction action(required_action);
+  SentryAuthorizableScope scope(required_scope);
+  for (const auto& privilege : privileges) {
+    // A grant option cannot imply the other if the latter is set but the
+    // former is not.
+    if (requires_all_with_grant && !privilege.all_with_grant) {
+      continue;
+    }
+    // Both privilege scope and action need to imply the other.
+    if (SentryAuthorizableScope(privilege.scope).Implies(scope)) {
+      for (const auto& granted_action : privilege.granted_privileges) {
+        if (SentryAction(granted_action).Implies(action)) {
+          return true;
+        }
+      }
+    }
+  }
+  return false;
+}
+
 SentryAuthzProvider::~SentryAuthzProvider() {
   Stop();
 }
@@ -172,8 +221,8 @@ Status GetAuthorizable(const string& table_ident,
                        TSentryAuthorizable* authorizable) {
   Slice database;
   Slice table;
-  // Authorizable scope for table authorizable type must be equal or higher than
-  // 'TABLE'.
+  // We should only ever request privileges from Sentry for authorizables of
+  // scope equal to or higher than 'TABLE'.
   DCHECK_NE(scope, SentryAuthorizableScope::Scope::COLUMN);
   switch (scope) {
     case SentryAuthorizableScope::Scope::TABLE:
@@ -254,92 +303,249 @@ Status SentryAuthzProvider::AuthorizeAlterTable(const string& old_table,
                    new_table, user);
 }
 
-Status SentryAuthzProvider::AuthorizeGetTableMetadata(const std::string& table_name,
-                                                      const std::string& user) {
+Status SentryAuthzProvider::AuthorizeGetTableMetadata(const string& table_name,
+                                                      const string& user) {
   // Retrieving table metadata requires 'METADATA ON TABLE' privilege.
   return Authorize(SentryAuthorizableScope::Scope::TABLE,
                    SentryAction::Action::METADATA,
                    table_name, user);
 }
 
-Status SentryAuthzProvider::Authorize(SentryAuthorizableScope::Scope scope,
-                                      SentryAction::Action action,
-                                      const string& table_ident,
-                                      const string& user,
-                                      bool require_grant_option) {
+const AuthorizableScopesSet& SentryAuthzProvider::ExpectedEmptyFields(
+    SentryAuthorizableScope::Scope scope) {
+  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::DATABASE,
+                                                    SentryAuthorizableScope::TABLE,
+                                                    SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::TABLE,
+                                                SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kColumnFields{};
+  switch (scope) {
+    case SentryAuthorizableScope::SERVER:
+      return kServerFields;
+    case SentryAuthorizableScope::DATABASE:
+      return kDbFields;
+    case SentryAuthorizableScope::TABLE:
+      return kTableFields;
+    case SentryAuthorizableScope::COLUMN:
+      return kColumnFields;
+    default:
+      LOG(DFATAL) << "not reachable";
+  }
+  return kColumnFields;
+}
 
-  if (AuthzProvider::IsTrustedUser(user)) {
-    return Status::OK();
+const AuthorizableScopesSet& SentryAuthzProvider::ExpectedNonEmptyFields(
+    SentryAuthorizableScope::Scope scope) {
+  AuthorizableScopesSet expected_nonempty_fields;
+  static const AuthorizableScopesSet kColumnFields{ SentryAuthorizableScope::SERVER,
+                                                    SentryAuthorizableScope::DATABASE,
+                                                    SentryAuthorizableScope::TABLE,
+                                                    SentryAuthorizableScope::COLUMN };
+  static const AuthorizableScopesSet kTableFields{ SentryAuthorizableScope::SERVER,
+                                                   SentryAuthorizableScope::DATABASE,
+                                                   SentryAuthorizableScope::TABLE };
+  static const AuthorizableScopesSet kDbFields{ SentryAuthorizableScope::SERVER,
+                                                SentryAuthorizableScope::DATABASE };
+  static const AuthorizableScopesSet kServerFields{ SentryAuthorizableScope::SERVER };
+  switch (scope) {
+    case SentryAuthorizableScope::COLUMN:
+      return kColumnFields;
+    case SentryAuthorizableScope::TABLE:
+      return kTableFields;
+    case SentryAuthorizableScope::DATABASE:
+      return kDbFields;
+    case SentryAuthorizableScope::SERVER:
+      return kServerFields;
+    default:
+      LOG(DFATAL) << "not reachable";
   }
+  return kColumnFields;
+}
 
-  TSentryAuthorizable authorizable;
-  RETURN_NOT_OK(GetAuthorizable(table_ident, scope, &authorizable));
+bool SentryAuthzProvider::SentryPrivilegeIsWellFormed(
+    const TSentryPrivilege& privilege,
+    const TSentryAuthorizable& requested_authorizable,
+    SentryAuthorizableScope::Scope* scope,
+    SentryAction::Action* action) {
+  DCHECK_EQ(FLAGS_server_name, requested_authorizable.server);
+  DCHECK(!requested_authorizable.server.empty());
+  DCHECK(requested_authorizable.column.empty());
+
+  // A requested table must be accompanied by a database.
+  bool authorizable_has_db = !requested_authorizable.db.empty();
+  bool authorizable_has_table = !requested_authorizable.table.empty();
+  DCHECK((authorizable_has_db && authorizable_has_table) || !authorizable_has_table);
+
+  // Ignore anything that isn't a Kudu-related privilege.
+  SentryAuthorizableScope granted_scope;
+  SentryAction granted_action;
+  Status s = SentryAuthorizableScope::FromString(privilege.privilegeScope, &granted_scope)
+      .AndThen([&] {
+        return SentryAction::FromString(privilege.action, &granted_action);
+      });
+  if (!s.ok()) {
+    return false;
+  }
 
-  // In general, a privilege implies another when:
-  // 1. the authorizable from the former implies the authorizable from the latter
-  //    (authorizable with a higher scope on the hierarchy can imply authorizables
-  //    with a lower scope on the hierarchy, but not vice versa), and
-  // 2. the action from the former implies the action from the latter, and
-  // 3. grant option from the former implies the grant option from the latter.
-  //
-  // See org.apache.sentry.policy.common.CommonPrivilege. Note that policy validation
-  // in CommonPrivilege also allows wildcard authorizable matching. For example,
-  // authorizable 'server=server1->db=*' can imply authorizable 'server=server1'.
-  // However, wildcard authorizable granting is neither practical nor useful (semantics
-  // of granting such privilege are not supported in Apache Hive, Impala and Hue. And
-  // 'server=server1->db=*' has exactly the same meaning as 'server=server1'). Therefore,
-  // wildcard authorizable matching is dropped in this implementation.
-  //
-  // Moreover, because ListPrivilegesByUser lists all Sentry privileges granted to the
-  // user that match the authorizable of each scope in the input authorizable hierarchy,
-  // privileges with lower scope will also be returned in the response. This contradicts
-  // rule (1) mentioned above. Therefore, we need to validate privilege scope, in addition
-  // to action and grant option. Otherwise, privilege escalation can happen.
+  // Make sure that there aren't extraneous fields set in the privilege.
+  for (const auto& empty_field : ExpectedEmptyFields(granted_scope.scope())) {
+    switch (empty_field) {
+      case SentryAuthorizableScope::COLUMN:
+        if (!privilege.columnName.empty()) {
+          return false;
+        }
+        break;
+      case SentryAuthorizableScope::TABLE:
+        if (!privilege.tableName.empty()) {
+          return false;
+        }
+        break;
+      case SentryAuthorizableScope::DATABASE:
+        if (!privilege.dbName.empty()) {
+          return false;
+        }
+        break;
+      case SentryAuthorizableScope::SERVER:
+        if (!privilege.serverName.empty()) {
+          return false;
+        }
+        break;
+      default:
+        LOG(DFATAL) << Substitute("Granted privilege has invalid scope: $0",
+                                  sentry::ScopeToString(granted_scope.scope()));
+    }
+  }
+  // Make sure that all expected fields are set, and that they match those in
+  // the requested authorizable.
+  for (const auto& nonempty_field : ExpectedNonEmptyFields(granted_scope.scope())) {
+    switch (nonempty_field) {
+      case SentryAuthorizableScope::COLUMN:
+        if (!privilege.__isset.columnName || privilege.columnName.empty()) {
+          return false;
+        }
+        break;
+      case SentryAuthorizableScope::TABLE:
+        if (!privilege.__isset.tableName || privilege.tableName.empty() ||
+            (authorizable_has_table &&
+             !boost::iequals(privilege.tableName, requested_authorizable.table))) {
+          return false;
+        }
+        break;
+      case SentryAuthorizableScope::DATABASE:
+        if (!privilege.__isset.dbName || privilege.dbName.empty() ||
+            (authorizable_has_db &&
+             !boost::iequals(privilege.dbName, requested_authorizable.db))) {
+          return false;
+        }
+        break;
+      case SentryAuthorizableScope::SERVER:
+        if (privilege.serverName.empty() ||
+            !boost::iequals(privilege.serverName, requested_authorizable.server)) {
+          return false;
+        }
+        break;
+      default:
+        LOG(DFATAL) << Substitute("Granted privilege has invalid scope: $0",
+                                  sentry::ScopeToString(granted_scope.scope()));
+    }
+  }
+  *scope = granted_scope.scope();
+  *action = granted_action.action();
+  return true;
+}
+
+namespace {
+
+// Returns a unique string key for the given authorizable, at the given scope.
+// The authorizable must be a well-formed at the given scope.
+string GetKey(const string& server, const string& db, const string& table, const string& column,
+              SentryAuthorizableScope::Scope scope) {
+  DCHECK(!server.empty());
+  switch (scope) {
+    case SentryAuthorizableScope::SERVER:
+      return server;
+    case SentryAuthorizableScope::DATABASE:
+      DCHECK(!db.empty());
+      return Substitute("$0/$1", server, db);
+    case SentryAuthorizableScope::TABLE:
+      DCHECK(!db.empty() && !table.empty());
+      return Substitute("$0/$1/$2", server, db, table);
+    case SentryAuthorizableScope::COLUMN:
+      DCHECK(!db.empty() && !table.empty() && !column.empty());
+      return Substitute("$0/$1/$2/$3", server, db, table, column);
+    default:
+      LOG(DFATAL) << "not reachable";
+  }
+  return "";
+}
+
+} // anonymous namespace
+
+Status SentryAuthzProvider::GetSentryPrivileges(SentryAuthorizableScope::Scope scope,
+                                                const string& table_name,
+                                                const string& user,
+                                                SentryPrivilegesBranch* privileges) {
+  TSentryAuthorizable requested_authorizable;
+  RETURN_NOT_OK(GetAuthorizable(table_name, scope, &requested_authorizable));
 
   TListSentryPrivilegesRequest request;
   request.__set_requestorUserName(FLAGS_kudu_service_name);
   request.__set_principalName(user);
-  request.__set_authorizableHierarchy(authorizable);
+  request.__set_authorizableHierarchy(requested_authorizable);
   TListSentryPrivilegesResponse response;
-
   RETURN_NOT_OK(ha_client_.Execute(
       [&] (SentryClient* client) {
         return client->ListPrivilegesByUser(request, &response);
       }));
-
-  SentryAction required_action(action);
-  SentryAuthorizableScope required_scope(scope);
-  for (const auto& privilege : response.privileges) {
-    // A grant option cannot imply the other if the latter is set
-    // but the former is not.
-    if (require_grant_option && privilege.grantOption != TSentryGrantOption::ENABLED) {
+  unordered_map<string, AuthorizablePrivileges> privileges_map;
+  for (const auto& privilege_resp : response.privileges) {
+    SentryAuthorizableScope::Scope granted_scope;
+    SentryAction::Action granted_action;
+    if (!SentryPrivilegeIsWellFormed(privilege_resp, requested_authorizable,
+                                     &granted_scope, &granted_action)) {
+      if (VLOG_IS_ON(1)) {
+        std::ostringstream os;
+        privilege_resp.printTo(os);
+        VLOG(1) << Substitute("Ignoring privilege response: $0", os.str());
+      }
       continue;
     }
-
-    SentryAction granted_action;
-    Status s = SentryAction::FromString(privilege.action, &granted_action);
-    if (!s.ok()) {
-      LOG(WARNING) << s.ToString();
-      continue;
+    const auto& db = privilege_resp.dbName;
+    const auto& table = privilege_resp.tableName;
+    const auto& column = privilege_resp.columnName;
+    const string authorizable_key = GetKey(privilege_resp.serverName, db, table, column,
+                                           granted_scope);
+    AuthorizablePrivileges& privilege = LookupOrInsert(&privileges_map, authorizable_key,
+        AuthorizablePrivileges(granted_scope, db, table, column));
+    InsertIfNotPresent(&privilege.granted_privileges, granted_action);
+    if ((granted_action == SentryAction::ALL || granted_action == SentryAction::OWNER) &&
+        privilege_resp.grantOption == TSentryGrantOption::ENABLED) {
+      privilege.all_with_grant = true;
     }
+  }
+  EmplaceValuesFromMap(std::move(privileges_map), &privileges->privileges);
+  return Status::OK();
+}
 
-    SentryAuthorizableScope granted_scope;
-    s = SentryAuthorizableScope::FromString(privilege.privilegeScope, &granted_scope);
-    if (!s.ok()) {
-      LOG(WARNING) << s.ToString();
-      continue;
-    }
+Status SentryAuthzProvider::Authorize(SentryAuthorizableScope::Scope scope,
+                                      SentryAction::Action action,
+                                      const string& table_ident,
+                                      const string& user,
+                                      bool require_grant_option) {
+  if (AuthzProvider::IsTrustedUser(user)) {
+    return Status::OK();
+  }
 
-    // Both privilege scope and action need to imply the other.
-    if (granted_action.Implies(required_action) &&
-        granted_scope.Implies(required_scope)) {
-      return Status::OK();
-    }
+  SentryPrivilegesBranch privileges;
+  RETURN_NOT_OK(GetSentryPrivileges(scope, table_ident, user, &privileges));
+  if (privileges.Implies(scope, action, require_grant_option)) {
+    return Status::OK();
   }
 
-  // Logs a warning if the action is not authorized for debugging purpose, and
-  // only returns generic error back to the users to avoid side channel leak,
-  // e.g. 'whether table A exists'.
+  // Log a warning if the action is not authorized for debugging purpose, and
+  // only return a generic error to users to avoid a side channel leak, e.g.
+  // whether table A exists.
   LOG(WARNING) << Substitute("Action <$0> on table <$1> with authorizable scope "
                              "<$2> is not permitted for user <$3>",
                              sentry::ActionToString(action),
diff --git a/src/kudu/master/sentry_authz_provider.h b/src/kudu/master/sentry_authz_provider.h
index b9e290d..51d5cc1 100644
--- a/src/kudu/master/sentry_authz_provider.h
+++ b/src/kudu/master/sentry_authz_provider.h
@@ -17,10 +17,15 @@
 
 #pragma once
 
+#include <ostream>
 #include <string>
+#include <utility>
+#include <vector>
 
+#include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/master/authz_provider.h"
 #include "kudu/sentry/sentry_action.h"
@@ -29,10 +34,78 @@
 #include "kudu/thrift/client.h"
 #include "kudu/util/status.h"
 
+namespace sentry {
+class TSentryAuthorizable;
+class TSentryPrivilege;
+} // namespace sentry
+
 namespace kudu {
 
 namespace master {
 
+// Utility struct to facilitate evaluating the privileges of a given
+// authorizable. This is preferred to using Sentry's Thrift responses directly,
+// since useful information has already been parsed to generate this struct
+// (e.g. the SentryActions and scope).
+struct AuthorizablePrivileges {
+  AuthorizablePrivileges(sentry::SentryAuthorizableScope::Scope scope,
+                         std::string db,
+                         std::string table,
+                         std::string column)
+    : scope(scope),
+      db_name(std::move(db)),
+      table_name(std::move(table)),
+      column_name(std::move(column)) {
+#ifndef NDEBUG
+    switch (scope) {
+      case sentry::SentryAuthorizableScope::COLUMN:
+        CHECK(!column_name.empty());
+        FALLTHROUGH_INTENDED;
+      case sentry::SentryAuthorizableScope::TABLE:
+        CHECK(!table_name.empty());
+        FALLTHROUGH_INTENDED;
+      case sentry::SentryAuthorizableScope::DATABASE:
+        CHECK(!db_name.empty());
+        break;
+      case sentry::SentryAuthorizableScope::SERVER:
+        break;
+      default:
+        LOG(FATAL) << "not reachable";
+    }
+#endif
+  }
+
+  // Whether the privilege 'ALL' or 'OWNER' has been granted with Sentry's
+  // grant option enabled. Note that the grant option can be granted on any
+  // action, but for Kudu, we only use it with 'ALL' or 'OWNER'.
+  bool all_with_grant = false;
+
+  // The scope of the authorizable being granted privileges.
+  const sentry::SentryAuthorizableScope::Scope scope;
+
+  // The set of actions on which privileges are granted.
+  sentry::SentryActionsSet granted_privileges;
+
+  // The fields of the authorizable.
+  std::string db_name;
+  std::string table_name;
+  std::string column_name;
+};
+
+// A representation of the Sentry privilege hierarchy branch for a single table
+// (including privileges for the table's ancestors and descendents) for a
+// single user.
+struct SentryPrivilegesBranch {
+  // Set of privileges are granted.
+  std::vector<AuthorizablePrivileges> privileges;
+
+  // Returns whether or not this implies the given action and scope for the
+  // given table.
+  bool Implies(sentry::SentryAuthorizableScope::Scope required_scope,
+               sentry::SentryAction::Action required_action,
+               bool requires_all_with_grant) const;
+};
+
 // An implementation of AuthzProvider that connects to the Sentry service
 // for authorization metadata and allows or denies the actions performed by
 // users based on the metadata.
@@ -75,7 +148,44 @@ class SentryAuthzProvider : public AuthzProvider {
                                    const std::string& user) override WARN_UNUSED_RESULT;
 
  private:
+  friend class SentryAuthzProviderFilterResponsesTest;
+  FRIEND_TEST(SentryAuthzProviderStaticTest, TestPrivilegesWellFormed);
   FRIEND_TEST(TestAuthzHierarchy, TestAuthorizableScope);
+  FRIEND_TEST(SentryAuthzProviderFilterResponsesTest, TestFilterInvalidResponses);
+  FRIEND_TEST(SentryAuthzProviderFilterResponsesTest, TestFilterValidResponses);
+
+  // Utility function to determine whether the given privilege is a well-formed
+  // possibly Kudu-related privilege describing a descendent or ancestor of the
+  // requested authorizable in the Sentry hierarchy tree, i.e. it:
+  // - has a Kudu-related action (e.g. ALL, INSERT, UPDATE, etc.),
+  // - has a Kudu-related authorizable scope (e.g. SERVER, DATABASE, etc.),
+  // - all fields of equal or higher scope to the privilege's scope are set;
+  //   none lower are set, and
+  // - all fields that are set match those set by the input authorizable.
+  static bool SentryPrivilegeIsWellFormed(
+      const ::sentry::TSentryPrivilege& privilege,
+      const ::sentry::TSentryAuthorizable& requested_authorizable,
+      sentry::SentryAuthorizableScope::Scope* scope,
+      sentry::SentryAction::Action* action);
+
+  // Returns the set of scope fields expected to be non-empty in a Sentry
+  // response with the given authorizable scope. All fields of equal or higher
+  // scope are expected to be set.
+  static const sentry::AuthorizableScopesSet& ExpectedNonEmptyFields(
+      sentry::SentryAuthorizableScope::Scope scope);
+
+  // Returns the set of scope fields expected to be empty in a Sentry response
+  // with the given authorizable scope. All fields of lower scope are expected
+  // to be empty.
+  static const sentry::AuthorizableScopesSet& ExpectedEmptyFields(
+      sentry::SentryAuthorizableScope::Scope scope);
+
+  // Fetches the user's privileges from Sentry for the authorizable specified
+  // by the given table and scope.
+  Status GetSentryPrivileges(sentry::SentryAuthorizableScope::Scope scope,
+                             const std::string& table_name,
+                             const std::string& user,
+                             SentryPrivilegesBranch* privileges);
 
   // Checks if the user can perform an action on the table identifier (in the format
   // <database-name>.<table-name>), based on the given authorizable scope and the
diff --git a/src/kudu/sentry/sentry_action.h b/src/kudu/sentry/sentry_action.h
index d12613a..882e7ff 100644
--- a/src/kudu/sentry/sentry_action.h
+++ b/src/kudu/sentry/sentry_action.h
@@ -21,6 +21,7 @@
 #include <string>
 
 #include "kudu/gutil/port.h"
+#include "kudu/util/bitset.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -47,7 +48,7 @@ class SentryAction {
   // only to represent an action in uninitialized state.
   //
   // See org.apache.sentry.core.model.db.HiveActionFactory.
-  enum class Action {
+  enum Action {
     UNINITIALIZED,
     ALL,
     METADATA,
@@ -60,6 +61,7 @@ class SentryAction {
     DROP,
     OWNER,
   };
+  static const size_t kMaxAction = Action::OWNER + 1;
 
   // The default constructor is useful when creating an Action
   // from string.
@@ -102,5 +104,8 @@ const char* ActionToString(SentryAction::Action action);
 
 std::ostream& operator<<(std::ostream& o, SentryAction::Action action);
 
+typedef FixedBitSet<sentry::SentryAction::Action, sentry::SentryAction::kMaxAction>
+    SentryActionsSet;
+
 } // namespace sentry
 } // namespace kudu
diff --git a/src/kudu/sentry/sentry_authorizable_scope.h b/src/kudu/sentry/sentry_authorizable_scope.h
index 57ee201..b0f6edc 100644
--- a/src/kudu/sentry/sentry_authorizable_scope.h
+++ b/src/kudu/sentry/sentry_authorizable_scope.h
@@ -20,7 +20,9 @@
 #include <iosfwd>
 #include <string>
 
+#include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
+#include "kudu/util/bitset.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -39,13 +41,14 @@ class SentryAuthorizableScope {
 
   // Note that 'UNINITIALIZED' is not an actual scope but
   // only to represent the uninitialized state.
-  enum class Scope {
+  enum Scope {
     UNINITIALIZED,
     SERVER,
     DATABASE,
     TABLE,
     COLUMN,
   };
+  static const size_t kScopeMaxVal = Scope::COLUMN + 1;
 
   // The default constructor is useful when creating an authorizable scope
   // from string.
@@ -79,5 +82,8 @@ const char* ScopeToString(SentryAuthorizableScope::Scope scope);
 
 std::ostream& operator<<(std::ostream& o, SentryAuthorizableScope::Scope scope);
 
+typedef FixedBitSet<SentryAuthorizableScope::Scope, SentryAuthorizableScope::kScopeMaxVal>
+    AuthorizableScopesSet;
+
 } // namespace sentry
 } // namespace kudu