You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/04/11 01:19:12 UTC

[kudu] 04/04: [SentryPrivilegesFetcher] deduplicate RPCs to Sentry

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 945148372270f2dc56a76d5d76705cc472a3a6f0
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Apr 8 09:26:29 2019 -0700

    [SentryPrivilegesFetcher] deduplicate RPCs to Sentry
    
    With this patch, concurrent requests to Sentry with the same set
    of parameters are queued, so the actual number of RPC requests sent
    to Sentry is reduced.
    
    Added a couple of tests to cover the newly added functionality.
    
    Change-Id: I2714ab032df6029240ad9e7f4fb03ff73c6b79ef
    Reviewed-on: http://gerrit.cloudera.org:8080/12967
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/master/CMakeLists.txt                |   2 +-
 src/kudu/master/sentry_authz_provider-test.cc | 125 ++++++++++++++++++++++++++
 src/kudu/master/sentry_privileges_fetcher.cc  |  75 ++++++++++++----
 src/kudu/master/sentry_privileges_fetcher.h   |  15 +++-
 4 files changed, 197 insertions(+), 20 deletions(-)

diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 0b8c40e..aeaaf44 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -89,7 +89,7 @@ ADD_KUDU_TEST(master-test RESOURCE_LOCK "master-web-port"
                           DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(mini_master-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(placement_policy-test)
-ADD_KUDU_TEST(sentry_authz_provider-test NUM_SHARDS 4)
+ADD_KUDU_TEST(sentry_authz_provider-test NUM_SHARDS 8)
 ADD_KUDU_TEST(sys_catalog-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(ts_descriptor-test DATA_FILES ../scripts/first_argument.sh)
 
diff --git a/src/kudu/master/sentry_authz_provider-test.cc b/src/kudu/master/sentry_authz_provider-test.cc
index f83534f..8d015cc 100644
--- a/src/kudu/master/sentry_authz_provider-test.cc
+++ b/src/kudu/master/sentry_authz_provider-test.cc
@@ -17,10 +17,12 @@
 
 #include "kudu/master/sentry_authz_provider.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <unordered_set>
 #include <vector>
@@ -38,6 +40,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
 #include "kudu/master/sentry_authz_provider-test-base.h"
 #include "kudu/master/sentry_privileges_fetcher.h"
 #include "kudu/security/token.pb.h"
@@ -47,6 +50,7 @@
 #include "kudu/sentry/sentry_authorizable_scope.h"
 #include "kudu/sentry/sentry_client.h"
 #include "kudu/sentry/sentry_policy_service_types.h"
+#include "kudu/util/barrier.h"
 #include "kudu/util/hdr_histogram.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/net/net_util.h"
@@ -84,6 +88,7 @@ using sentry::TSentryAuthorizable;
 using sentry::TSentryGrantOption;
 using sentry::TSentryPrivilege;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::unordered_map;
 using std::unordered_set;
@@ -938,5 +943,125 @@ TEST_F(TestSentryClientMetrics, Basic) {
   ASSERT_LT(2000000, hist->histogram()->TotalSum());
 }
 
+enum class ThreadsNumPolicy {
+  CloseToCPUsNum,
+  MoreThanCPUsNum,
+};
+
+// Test to ensure concurrent requests to Sentry with the same set of parameters
+// are accumulated by SentryAuthzProvider, so in total there is less RPC
+// requests sent to Sentry than the total number of concurrent requests to
+// the provider (ideally, there should be just single request to Sentry).
+class TestConcurrentRequests :
+    public SentryAuthzProviderTest,
+    public ::testing::WithParamInterface<std::tuple<ThreadsNumPolicy,
+                                                    AuthzCaching>> {
+ public:
+  bool CachingEnabled() const override {
+    return std::get<1>(GetParam()) == AuthzCaching::Enabled;
+  }
+};
+
+// Verify how multiple concurrent requests are handled when Sentry responds
+// with success.
+TEST_P(TestConcurrentRequests, SuccessResponses) {
+  const auto kNumRequestThreads =
+      std::get<0>(GetParam()) == ThreadsNumPolicy::CloseToCPUsNum
+      ? std::min(base::NumCPUs(), 4) : base::NumCPUs() * 3;
+
+  ASSERT_OK(CreateRoleAndAddToGroups());
+  const auto privilege = GetDatabasePrivilege("db", "METADATA");
+  ASSERT_OK(AlterRoleGrantPrivilege(privilege));
+
+  Barrier barrier(kNumRequestThreads);
+
+  vector<thread> threads;
+  vector<Status> thread_status(kNumRequestThreads);
+  for (auto i = 0; i < kNumRequestThreads; ++i) {
+    const auto thread_idx = i;
+    threads.emplace_back([&, thread_idx] () {
+      barrier.Wait();
+      thread_status[thread_idx] = sentry_authz_provider_->
+          AuthorizeGetTableMetadata("db.table", kTestUser);
+    });
+  }
+  for (auto& thread : threads) {
+    thread.join();
+  }
+  for (const auto& s : thread_status) {
+    ASSERT_TRUE(s.ok()) << s.ToString();
+  }
+
+  const auto sentry_rpcs_num = GetTasksSuccessful();
+  // Ideally all requests should result in a single RPC sent to Sentry, but some
+  // scheduling anomalies might occur so even the current threshold of maximum
+  // (kNumRequestThreads / 2) of actual RPC requests to Sentry might be reached
+  // and the assertion below would be triggered. For example, the OS scheduler
+  // might de-schedule the majority of the threads spawned above for a time
+  // greater than it takes to complete an RPC to Sentry, and the de-scheduling
+  // might happen exactly prior the point when the 'earlier-running' thread
+  // added itself into a queue record designed to track concurrent requests.
+  // Essentially, that's about 'freezing' all incoming requests just before the
+  // queueing point, and then awakening them one by one, so no more than one
+  // thread is registered in the queue at any time.
+  //
+  // However, those anomalies are expected to be exceptionally rare. In fact,
+  // (kNumRequestThreads / 2) seems to be a good enough threshold even for TSAN
+  // builds while running the test scenario with --stress_cpu_threads=16.
+  ASSERT_GE(kNumRequestThreads / 2, sentry_rpcs_num);
+
+  // Issue the same request once more. If caching is enabled, there should be
+  // no additional RPCs sent to Sentry.
+  ASSERT_OK(sentry_authz_provider_->AuthorizeGetTableMetadata(
+      "db.table", kTestUser));
+  ASSERT_EQ(CachingEnabled() ? sentry_rpcs_num : sentry_rpcs_num + 1,
+            GetTasksSuccessful());
+}
+
+// Verify how multiple concurrent requests are handled when Sentry responds
+// with errors.
+TEST_P(TestConcurrentRequests, FailureResponses) {
+  const auto kNumRequestThreads =
+      std::get<0>(GetParam()) == ThreadsNumPolicy::CloseToCPUsNum
+      ? std::min(base::NumCPUs(), 4) : base::NumCPUs() * 3;
+
+  Barrier barrier(kNumRequestThreads);
+
+  vector<thread> threads;
+  vector<Status> thread_status(kNumRequestThreads);
+  for (auto i = 0; i < kNumRequestThreads; ++i) {
+    const auto thread_idx = i;
+    threads.emplace_back([&, thread_idx] () {
+      barrier.Wait();
+      thread_status[thread_idx] = sentry_authz_provider_->
+          AuthorizeCreateTable("db.table", "nobody", "nobody");
+    });
+  }
+  for (auto& thread : threads) {
+    thread.join();
+  }
+  for (const auto& s : thread_status) {
+    ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  }
+  ASSERT_EQ(0, GetTasksSuccessful());
+  ASSERT_EQ(0, GetTasksFailedNonFatal());
+  const auto sentry_rpcs_num = GetTasksFailedFatal();
+  // See the TestConcurrentRequests.SuccessResponses scenario above for details
+  // on setting the threshold for 'sentry_rpcs_num'.
+  ASSERT_GE(kNumRequestThreads / 2, sentry_rpcs_num);
+
+  // The cache does not store negative responses/errors, so in both caching and
+  // non-caching case there should be one extra RPC sent to Sentry.
+  auto s = sentry_authz_provider_->AuthorizeCreateTable(
+      "db.table", "nobody", "nobody");
+  ASSERT_TRUE(s.IsNotAuthorized()) << s.ToString();
+  ASSERT_EQ(sentry_rpcs_num + 1, GetTasksFailedFatal());
+}
+INSTANTIATE_TEST_CASE_P(QueueingConcurrentRequests, TestConcurrentRequests,
+    ::testing::Combine(::testing::Values(ThreadsNumPolicy::CloseToCPUsNum,
+                                         ThreadsNumPolicy::MoreThanCPUsNum),
+                       ::testing::Values(AuthzCaching::Disabled,
+                                         AuthzCaching::Enabled)));
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/sentry_privileges_fetcher.cc b/src/kudu/master/sentry_privileges_fetcher.cc
index 0e3aad1..6fdaad0 100644
--- a/src/kudu/master/sentry_privileges_fetcher.cc
+++ b/src/kudu/master/sentry_privileges_fetcher.cc
@@ -20,6 +20,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <ostream>
 #include <type_traits>
 #include <unordered_map>
@@ -31,6 +32,7 @@
 #include <glog/logging.h>
 
 #include "kudu/common/table_util.h"
+#include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -43,6 +45,7 @@
 #include "kudu/sentry/sentry_policy_service_types.h"
 #include "kudu/thrift/client.h"
 #include "kudu/thrift/ha_client_metrics.h"
+#include "kudu/util/async_util.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/malloc.h"
 #include "kudu/util/monotime.h"
@@ -147,6 +150,7 @@ using sentry::TListSentryPrivilegesResponse;
 using sentry::TSentryAuthorizable;
 using sentry::TSentryGrantOption;
 using sentry::TSentryPrivilege;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
@@ -431,24 +435,56 @@ Status SentryPrivilegesFetcher::GetSentryPrivileges(
     return Status::OK();
   }
 
-  TListSentryPrivilegesResponse response;
-  RETURN_NOT_OK(FetchPrivilegesFromSentry(FLAGS_kudu_service_name,
-                                          user, authorizable, &response));
-  SentryPrivilegesBranch result(authorizable, response);
-  if (PREDICT_FALSE(!cache_)) {
-    *privileges = std::move(result);
+  Synchronizer sync;
+  bool is_first_request = false;
+  // The result (i.e. fetched privileges) might be used independently
+  // by multiple threads. The shared ownership approach simplifies passing
+  // the information around.
+  shared_ptr<SentryPrivilegesBranch> fetched_privileges;
+  {
+    std::lock_guard<simple_spinlock> l(pending_requests_lock_);
+    auto& pending_request = LookupOrEmplace(&pending_requests_,
+                                            key, SentryRequestsInfo());
+    // Is the queue of pending requests for the same key empty?
+    // If yes, that's the first request being sent out.
+    is_first_request = pending_request.callbacks.empty();
+    pending_request.callbacks.emplace_back(sync.AsStatusCallback());
+    if (is_first_request) {
+      DCHECK(!pending_request.result);
+      pending_request.result = std::make_shared<SentryPrivilegesBranch>();
+    }
+    fetched_privileges = pending_request.result;
+  }
+  if (!is_first_request) {
+    RETURN_NOT_OK(sync.Wait());
+    *privileges = *fetched_privileges;
     return Status::OK();
   }
 
-  // Put the result into the cache.
-  unique_ptr<SentryPrivilegesBranch> result_ptr(
-      new SentryPrivilegesBranch(result));
-  const auto result_footprint = result_ptr->memory_footprint() + key.capacity();
-  cache_->Put(key, std::move(result_ptr), result_footprint);
-  VLOG(2) << Substitute("cached entry of size $0 bytes for key '$1'",
-                        result_footprint, key);
+  const auto s = FetchPrivilegesFromSentry(FLAGS_kudu_service_name,
+                                           user, authorizable,
+                                           fetched_privileges.get());
+  if (s.ok() && PREDICT_TRUE(cache_)) {
+    // Put the result into the cache. Negative results/errors are not cached.
+    unique_ptr<SentryPrivilegesBranch> result_ptr(
+        new SentryPrivilegesBranch(*fetched_privileges));
+    const auto result_footprint = result_ptr->memory_footprint() + key.capacity();
+    cache_->Put(key, std::move(result_ptr), result_footprint);
+    VLOG(2) << Substitute("cached entry of size $0 bytes for key '$1'",
+                          result_footprint, key);
+  }
 
-  *privileges = std::move(result);
+  SentryRequestsInfo info;
+  {
+    std::lock_guard<simple_spinlock> l(pending_requests_lock_);
+    info = EraseKeyReturnValuePtr(&pending_requests_, key);
+  }
+  CHECK_LE(1, info.callbacks.size());
+  for (auto& cb : info.callbacks) {
+    cb.Run(s);
+  }
+  RETURN_NOT_OK(s);
+  *privileges = *fetched_privileges;
   return Status::OK();
 }
 
@@ -604,15 +640,18 @@ Status SentryPrivilegesFetcher::FetchPrivilegesFromSentry(
     const string& service_name,
     const string& user,
     const TSentryAuthorizable& authorizable,
-    TListSentryPrivilegesResponse* response) {
+    SentryPrivilegesBranch* result) {
   TListSentryPrivilegesRequest request;
   request.__set_requestorUserName(service_name);
   request.__set_principalName(user);
   request.__set_authorizableHierarchy(authorizable);
-  return sentry_client_.Execute(
+  TListSentryPrivilegesResponse response;
+  RETURN_NOT_OK(sentry_client_.Execute(
       [&] (SentryClient* client) {
-        return client->ListPrivilegesByUser(request, response);
-      });
+        return client->ListPrivilegesByUser(request, &response);
+      }));
+  *result = SentryPrivilegesBranch(authorizable, response);
+  return Status::OK();
 }
 
 Status SentryPrivilegesFetcher::ResetCache() {
diff --git a/src/kudu/master/sentry_privileges_fetcher.h b/src/kudu/master/sentry_privileges_fetcher.h
index 23ef6b4..bed8f1c 100644
--- a/src/kudu/master/sentry_privileges_fetcher.h
+++ b/src/kudu/master/sentry_privileges_fetcher.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -35,8 +36,10 @@
 #include "kudu/sentry/sentry_client.h"
 #include "kudu/thrift/client.h"
 #include "kudu/util/bitset.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
 #include "kudu/util/ttl_cache.h"
 
 namespace sentry {
@@ -196,7 +199,7 @@ class SentryPrivilegesFetcher {
       const std::string& service_name,
       const std::string& user,
       const ::sentry::TSentryAuthorizable& authorizable,
-      ::sentry::TListSentryPrivilegesResponse* response);
+      SentryPrivilegesBranch* result);
 
   // Resets the authz cache. In addition to lifecycle-related methods like
   // Start(), this method is also used by tests: if the authz information
@@ -220,6 +223,16 @@ class SentryPrivilegesFetcher {
   // The TTL cache to store information on privileges received from Sentry.
   typedef TTLCache<std::string, SentryPrivilegesBranch> AuthzInfoCache;
   std::unique_ptr<AuthzInfoCache> cache_;
+
+  // Utility dictionary to keep track of requests sent to Sentry. Access is
+  // guarded by pending_requests_lock_. The key corresponds to the set of
+  // parameters for a request sent to Sentry.
+  struct SentryRequestsInfo {
+    std::vector<StatusCallback> callbacks;
+    std::shared_ptr<SentryPrivilegesBranch> result;
+  };
+  std::unordered_map<std::string, SentryRequestsInfo> pending_requests_;
+  simple_spinlock pending_requests_lock_;
 };
 
 } // namespace master