You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/20 04:51:51 UTC

[kudu] branch master updated (0923488 -> fefb433)

This is an automated email from the ASF dual-hosted git repository.

todd pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 0923488  util: stop using namespaces
     new 8026237  [ranger] pass 'principal' and 'keytab' to the subprocess
     new 1a21e92  rpc: reduce context switches and receive calls
     new fefb433  rpc: use a lighter weight completion for sync RPCs

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/ranger/CMakeLists.txt   |   3 +-
 src/kudu/ranger/ranger_client.cc |  90 +++++++++++++++++--------
 src/kudu/ranger/ranger_client.h  |  18 +++--
 src/kudu/rpc/connection.cc       |  17 +++--
 src/kudu/rpc/proxy.cc            |  11 ++--
 src/kudu/rpc/reactor.cc          |   6 +-
 src/kudu/rpc/transfer.cc         |  70 +++++++++++++-------
 src/kudu/rpc/transfer.h          |  11 +++-
 src/kudu/security/init.cc        |  54 ++++++++-------
 src/kudu/security/init.h         |   6 ++
 src/kudu/server/server_base.cc   |  18 +----
 src/kudu/util/notification.h     | 137 +++++++++++++++++++++++++++++++++++++++
 12 files changed, 324 insertions(+), 117 deletions(-)
 create mode 100644 src/kudu/util/notification.h


[kudu] 02/03: rpc: reduce context switches and receive calls

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1a21e92057f61bd3b0581ac590a84bc7d7a21ac6
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 5 12:49:07 2020 -0800

    rpc: reduce context switches and receive calls
    
    * When queueing a task to execute on the reactor, avoid writing to the
      eventfd to wake it up if such a write has already been done. This
      should reduce the number of read/write syscalls to the eventfd and
      avoid "spurious" wakeups of the reactor.
    
    * When reading inbound data, read an extra 4 bytes, and if it's
      available, loop around to read another call without putting the
      reactor back to sleep.
    
    The effect on context switches is clearly visible using
      rpc-bench --gtest_filter=\*Async
    
    Before:
    I0305 12:50:56.463312  7468 rpc-bench.cc:128] Ctx Sw. per req:  0.640409
    I0305 12:50:58.015260  7542 rpc-bench.cc:128] Ctx Sw. per req:  0.613172
    I0305 12:50:59.563201  7587 rpc-bench.cc:128] Ctx Sw. per req:  0.589479
    I0305 12:51:01.014848  7662 rpc-bench.cc:128] Ctx Sw. per req:  0.562744
    I0305 12:51:02.666339  7736 rpc-bench.cc:128] Ctx Sw. per req:  0.569126
    
    After:
    I0305 12:52:03.567790  9005 rpc-bench.cc:128] Ctx Sw. per req:  0.383251
    I0305 12:52:05.050909  9079 rpc-bench.cc:128] Ctx Sw. per req:  0.454404
    I0305 12:52:06.626401  9138 rpc-bench.cc:128] Ctx Sw. per req:  0.3308
    I0305 12:52:08.123154  9198 rpc-bench.cc:128] Ctx Sw. per req:  0.317752
    I0305 12:52:09.666586  9272 rpc-bench.cc:128] Ctx Sw. per req:  0.391739
    
    And on system CPU:
    
    Before:
    I0305 12:50:56.463310  7468 rpc-bench.cc:127] Sys CPU per req:  16.5524us
    I0305 12:50:58.015259  7542 rpc-bench.cc:127] Sys CPU per req:  16.1158us
    I0305 12:50:59.563199  7587 rpc-bench.cc:127] Sys CPU per req:  17.3184us
    I0305 12:51:01.014847  7662 rpc-bench.cc:127] Sys CPU per req:  16.7911us
    I0305 12:51:02.666337  7736 rpc-bench.cc:127] Sys CPU per req:  15.7659us
    
    After:
    I0305 12:52:03.567787  9005 rpc-bench.cc:127] Sys CPU per req:  13.0533us
    I0305 12:52:05.050906  9079 rpc-bench.cc:127] Sys CPU per req:  13.7925us
    I0305 12:52:06.626399  9138 rpc-bench.cc:127] Sys CPU per req:  11.6987us
    I0305 12:52:08.123152  9198 rpc-bench.cc:127] Sys CPU per req:  11.9214us
    I0305 12:52:09.666584  9272 rpc-bench.cc:127] Sys CPU per req:  13.4031us
    
    And on syscalls:
    
    todd@turbo:~/kudu$ grep recvfr /tmp/before /tmp/after
    /tmp/before:           1458969      syscalls:sys_enter_recvfrom                                     ( +-  1.99% )
    /tmp/before:           1458969      syscalls:sys_exit_recvfrom                                     ( +-  1.99% )
    /tmp/after:           1252328      syscalls:sys_enter_recvfrom                                     ( +-  1.82% )
    /tmp/after:           1252328      syscalls:sys_exit_recvfrom                                     ( +-  1.82% )
    
    todd@turbo:~/kudu$ grep epoll_ctl /tmp/before /tmp/after
    /tmp/before:            915862      syscalls:sys_enter_epoll_ctl                                     ( +-  1.47% )
    /tmp/before:            915862      syscalls:sys_exit_epoll_ctl                                     ( +-  1.47% )
    /tmp/after:            475978      syscalls:sys_enter_epoll_ctl                                     ( +-  3.61% )
    /tmp/after:            475978      syscalls:sys_exit_epoll_ctl                                     ( +-  3.61% )
    
    On a more macro-benchmark (TSBS single-groupby-1-1-1 16 workers on an
    8-core machine) this also reduces syscalls a bit, though the end-to-end
    improvement is minimal.
    
    Before:
    
     Performance counter stats for 'system wide' (10 runs):
    
               340,444      cs                                                            ( +-  0.30% )
               144,024      syscalls:sys_enter_recvfrom                                     ( +-  0.00% )
                94,379      syscalls:sys_enter_epoll_ctl                                     ( +-  0.06% )
               129,376      syscalls:sys_enter_epoll_wait                                     ( +-  0.10% )
    
           2.025755946 seconds time elapsed                                          ( +-  0.43% )
    
    After:
     Performance counter stats for 'system wide' (10 runs):
    
               333,865      cs                                                            ( +-  0.27% )
               119,216      syscalls:sys_enter_recvfrom                                     ( +-  0.04% )
                88,731      syscalls:sys_enter_epoll_ctl                                     ( +-  0.08% )
               104,149      syscalls:sys_enter_epoll_wait                                     ( +-  0.08% )
    
           2.005614271 seconds time elapsed                                          ( +-  0.19% )
    
    Change-Id: I32c5e4d146c25be8e90665a0cb8385fcd017b15c
    Reviewed-on: http://gerrit.cloudera.org:8080/15440
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
---
 src/kudu/rpc/connection.cc | 17 ++++++-----
 src/kudu/rpc/reactor.cc    |  6 +++-
 src/kudu/rpc/transfer.cc   | 70 +++++++++++++++++++++++++++++++---------------
 src/kudu/rpc/transfer.h    | 11 ++++++--
 4 files changed, 69 insertions(+), 35 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 7dc0c03..a49dbe2 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -43,6 +43,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/slice.h"
@@ -645,11 +646,12 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
   }
   last_activity_time_ = reactor_thread_->cur_time();
 
+  faststring extra_buf;
   while (true) {
     if (!inbound_) {
       inbound_.reset(new InboundTransfer());
     }
-    Status status = inbound_->ReceiveBuffer(*socket_);
+    Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
     if (PREDICT_FALSE(!status.ok())) {
       if (status.posix_code() == ESHUTDOWN) {
         VLOG(1) << ToString() << " shut down by remote end.";
@@ -673,14 +675,11 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
       LOG(FATAL) << "Invalid direction: " << direction_;
     }
 
-    // TODO: it would seem that it would be good to loop around and see if
-    // there is more data on the socket by trying another recv(), but it turns
-    // out that it really hurts throughput to do so. A better approach
-    // might be for each InboundTransfer to actually try to read an extra byte,
-    // and if it succeeds, then we'd copy that byte into a new InboundTransfer
-    // and loop around, since it's likely the next call also arrived at the
-    // same time.
-    break;
+    if (extra_buf.size() > 0) {
+      inbound_.reset(new InboundTransfer(std::move(extra_buf)));
+    } else {
+      break;
+    }
   }
 }
 
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 231783b..e77488c 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -928,6 +928,7 @@ void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
 }
 
 void Reactor::ScheduleReactorTask(ReactorTask *task) {
+  bool was_empty;
   {
     std::unique_lock<LockType> l(lock_);
     if (closing_) {
@@ -936,9 +937,12 @@ void Reactor::ScheduleReactorTask(ReactorTask *task) {
       task->Abort(ShutdownError(false));
       return;
     }
+    was_empty = pending_tasks_.empty();
     pending_tasks_.push_back(*task);
   }
-  thread_.WakeThread();
+  if (was_empty) {
+    thread_.WakeThread();
+  }
 }
 
 bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index fb6d6a2..7693a91 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -20,6 +20,7 @@
 #include <sys/uio.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <iostream>
 #include <limits>
@@ -88,32 +89,43 @@ TransferCallbacks::~TransferCallbacks()
 {}
 
 InboundTransfer::InboundTransfer()
-  : total_length_(kMsgLengthPrefixLength),
+  : total_length_(0),
     cur_offset_(0) {
   buf_.resize(kMsgLengthPrefixLength);
 }
 
-Status InboundTransfer::ReceiveBuffer(Socket &socket) {
-  if (cur_offset_ < kMsgLengthPrefixLength) {
-    // receive uint32 length prefix
-    int32_t rem = kMsgLengthPrefixLength - cur_offset_;
-    int32_t nread;
-    Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
-    RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
-    if (nread == 0) {
-      return Status::OK();
-    }
-    DCHECK_GE(nread, 0);
-    cur_offset_ += nread;
+InboundTransfer::InboundTransfer(faststring initial_buf)
+  : buf_(std::move(initial_buf)),
+    total_length_(0),
+    cur_offset_(buf_.size()) {
+  buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) {
+  static constexpr int kExtraReadLength = kMsgLengthPrefixLength;
+  if (total_length_ == 0) {
+    // We haven't yet parsed the message length. It's possible that the
+    // length is already available in the buffer passed in the constructor.
     if (cur_offset_ < kMsgLengthPrefixLength) {
-      // If we still don't have the full length prefix, we can't continue
-      // reading yet.
-      return Status::OK();
+      // receive uint32 length prefix
+      int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+      int32_t nread;
+      Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
+      RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+      if (nread == 0) {
+        return Status::OK();
+      }
+      DCHECK_GE(nread, 0);
+      cur_offset_ += nread;
+      if (cur_offset_ < kMsgLengthPrefixLength) {
+        // If we still don't have the full length prefix, we can't continue
+        // reading yet.
+        return Status::OK();
+      }
     }
-    // Since we only read 'rem' bytes above, we should now have exactly
-    // the length prefix in our buffer and no more.
-    DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
 
+    // Parse the message length out of the prefix.
+    DCHECK_GE(cur_offset_, kMsgLengthPrefixLength);
     // The length prefix doesn't include its own 4 bytes, so we have to
     // add that back in.
     total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
@@ -126,7 +138,7 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
       return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
                                              total_length_));
     }
-    buf_.resize(total_length_);
+    buf_.resize(total_length_ + kExtraReadLength);
 
     // Fall through to receive the message body, which is likely to be already
     // available on the socket.
@@ -139,12 +151,24 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
   // INT_MAX. The message will be split across multiple Recv() calls.
   // Note that this is only needed when rpc_max_message_size > INT_MAX, which is
   // currently only used for unit tests.
-  int32_t rem = std::min(total_length_ - cur_offset_,
+  int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength,
       static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
-  Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+  Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
   cur_offset_ += nread;
 
+  // We may have read some extra bytes, in which case we need to trim them off
+  // and write them into the provided buffer.
+  if (cur_offset_ >= total_length_) {
+    int64_t extra_read = cur_offset_ - total_length_;
+    DCHECK_LE(extra_read, kExtraReadLength);
+    DCHECK_GE(extra_read, 0);
+    extra_4->clear();
+    extra_4->append(&buf_[total_length_], extra_read);
+    cur_offset_ = total_length_;
+    buf_.resize(total_length_);
+  }
+
   return Status::OK();
 }
 
@@ -153,7 +177,7 @@ bool InboundTransfer::TransferStarted() const {
 }
 
 bool InboundTransfer::TransferFinished() const {
-  return cur_offset_ == total_length_;
+  return total_length_ > 0 && cur_offset_ == total_length_;
 }
 
 string InboundTransfer::StatusAsString() const {
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 0628f2c..3cab6b1 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -67,9 +67,15 @@ class InboundTransfer {
  public:
 
   InboundTransfer();
+  explicit InboundTransfer(faststring initial_buf);
 
-  // read from the socket into our buffer
-  Status ReceiveBuffer(Socket &socket);
+  // Read from the socket into our buffer.
+  //
+  // If this is the last read of the transfer (i.e. if TransferFinished() is true
+  // after this call returns OK), up to 4 extra bytes may have been read
+  // from the socket and stored in 'extra_4'. In that case, any previous content of
+  // 'extra_4' is replaced by this extra bytes.
+  Status ReceiveBuffer(Socket *socket, faststring* extra_4);
 
   // Return true if any bytes have yet been sent.
   bool TransferStarted() const;
@@ -91,6 +97,7 @@ class InboundTransfer {
 
   faststring buf_;
 
+  // 0 indicates not yet set
   uint32_t total_length_;
   uint32_t cur_offset_;
 


[kudu] 03/03: rpc: use a lighter weight completion for sync RPCs

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fefb433917252ead0c78f7f6c674050514505cba
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 5 16:09:33 2020 -0800

    rpc: use a lighter weight completion for sync RPCs
    
    This adds a new Notification class which is a special purpose
    CountDownLatch with count 1, implemented on top of futex. This ends up
    being a bit more efficient than the pthread-based mutex and condition
    variable.
    
    Benchmarked with rpc-bench 40 times each before and after and ran
    t-tests on the reported metrics:
    
    User CPU (statistically significant 4-8% reduction):
    
    data:  subset(d, V1 == "with")$V2 and subset(d, V1 == "without")$V2
    t = -6.1821, df = 74.559, p-value = 3.081e-08
    alternative hypothesis: true difference in means is not equal to 0
    95 percent confidence interval:
     -1.5546221 -0.7968279
    sample estimates:
    mean of x mean of y
     16.96979  18.14551
    
    System CPU (no significant difference):
    
    data:  subset(d, V1 == "with")$V2 and subset(d, V1 == "without")$V2
    t = -0.23148, df = 66.883, p-value = 0.8176
    alternative hypothesis: true difference in means is not equal to 0
    95 percent confidence interval:
     -0.6114441  0.4843641
    sample estimates:
    mean of x mean of y
     41.03802  41.10156
    
    Context switches (statistically significant 1.6-2.3% reduction):
    
    data:  subset(d, V1 == "with")$V2 and subset(d, V1 == "without")$V2
    t = -11.198, df = 77.282, p-value < 2.2e-16
    alternative hypothesis: true difference in means is not equal to 0
    95 percent confidence interval:
     -0.0820182 -0.0572533
    sample estimates:
    mean of x mean of y
     3.551491  3.621127
    
    Change-Id: I1b65cce8bd48ee7edf6b2d08e96d00681c32aa97
    Reviewed-on: http://gerrit.cloudera.org:8080/15441
    Tested-by: Kudu Jenkins
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/rpc/proxy.cc        |  11 ++--
 src/kudu/util/notification.h | 137 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 142 insertions(+), 6 deletions(-)

diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 24668ab..1dfac01 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -26,14 +26,14 @@
 #include <glog/logging.h>
 
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/user_credentials.h"
 #include "kudu/util/net/sockaddr.h"
-#include "kudu/util/countdown_latch.h"
+#include "kudu/util/notification.h"
 #include "kudu/util/status.h"
 #include "kudu/util/user.h"
 
@@ -94,11 +94,10 @@ Status Proxy::SyncRequest(const string& method,
                           const google::protobuf::Message& req,
                           google::protobuf::Message* resp,
                           RpcController* controller) const {
-  CountDownLatch latch(1);
+  Notification note;
   AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller,
-               boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
-
-  latch.Wait();
+               boost::bind(&Notification::Notify, boost::ref(note)));
+  note.WaitForNotification();
   return controller->status();
 }
 
diff --git a/src/kudu/util/notification.h b/src/kudu/util/notification.h
new file mode 100644
index 0000000..b2d523d
--- /dev/null
+++ b/src/kudu/util/notification.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#ifdef __linux__
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/linux_syscall_support.h"
+#else
+#include "kudu/util/countdown_latch.h"
+#endif
+
+namespace kudu {
+
+// This class defines a `Notification` abstraction, which allows threads
+// to receive notification of a single occurrence of a single event.
+//
+// NOTE: this class is modeled after absl::Notification but re-implemented
+// to not have dependencies on other absl-specific code. If absl is ever
+// imported, this can be removed.
+//
+// The `Notification` object maintains a private boolean "notified" state that
+// transitions to `true` at most once. The `Notification` class provides the
+// following primary member functions:
+//   * `HasBeenNotified() `to query its state
+//   * `WaitForNotification*()` to have threads wait until the "notified" state
+//      is `true`.
+//   * `Notify()` to set the notification's "notified" state to `true` and
+//     notify all waiting threads that the event has occurred.
+//     This method may only be called once.
+//
+// Note that while `Notify()` may only be called once, it is perfectly valid to
+// call any of the `WaitForNotification*()` methods multiple times, from
+// multiple threads -- even after the notification's "notified" state has been
+// set -- in which case those methods will immediately return.
+//
+// Note that the lifetime of a `Notification` requires careful consideration;
+// it might not be safe to destroy a notification after calling `Notify()` since
+// it is still legal for other threads to call `WaitForNotification*()` methods
+// on the notification. However, observers responding to a "notified" state of
+// `true` can safely delete the notification without interfering with the call
+// to `Notify()` in the other thread.
+//
+// Memory ordering: For any threads X and Y, if X calls `Notify()`, then any
+// action taken by X before it calls `Notify()` is visible to thread Y after:
+//  * Y returns from `WaitForNotification()`, or
+//  * Y receives a `true` return value from `HasBeenNotified()`.
+#ifdef __linux__
+class Notification {
+ public:
+  Notification() : state_(NOT_NOTIFIED_NO_WAITERS) {}
+  ~Notification() = default;
+
+  bool HasBeenNotified() const {
+    return base::subtle::Acquire_Load(&state_) == NOTIFIED;
+  }
+
+  void WaitForNotification() const {
+    while (true) {
+      auto s = base::subtle::Acquire_Load(&state_);
+      if (s == NOT_NOTIFIED_NO_WAITERS) {
+        s = base::subtle::Acquire_CompareAndSwap(
+            &state_, NOT_NOTIFIED_NO_WAITERS, NOT_NOTIFIED_HAS_WAITERS);
+        if (s == NOT_NOTIFIED_NO_WAITERS) {
+          // We succeeded in the CAS -- sets 's' to be the new value of the
+          // state rather than the previous value.
+          s = NOT_NOTIFIED_HAS_WAITERS;
+        }
+      }
+      if (s == NOTIFIED) return;
+      DCHECK_EQ(s, NOT_NOTIFIED_HAS_WAITERS);
+      sys_futex(&state_, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, NOT_NOTIFIED_HAS_WAITERS,
+          /* timeout */ nullptr);
+    }
+  }
+
+  void Notify() {
+    auto s = base::subtle::Release_AtomicExchange(&state_, NOTIFIED);
+    DCHECK_NE(s, NOTIFIED) << "may only notify once";
+    if (s == NOT_NOTIFIED_HAS_WAITERS) {
+      sys_futex(&state_, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX,
+          nullptr /* ignored */);
+    }
+  }
+
+ private:
+  enum {
+    NOT_NOTIFIED_NO_WAITERS = 1,
+    NOT_NOTIFIED_HAS_WAITERS = 2,
+    NOTIFIED = 3
+  };
+  mutable Atomic32 state_;
+
+  DISALLOW_COPY_AND_ASSIGN(Notification);
+};
+#else
+// macOS doesn't have futex, so we just use the mutex-based latch instead.
+class Notification {
+ public:
+  Notification() : latch_(1) { }
+  ~Notification() = default;
+
+  bool HasBeenNotified() const {
+    return latch_.count() == 0;
+  }
+
+  void WaitForNotification() const {
+    latch_.Wait();
+  }
+
+  void Notify() {
+    latch_.CountDown();
+  }
+
+ private:
+  mutable CountDownLatch latch_;
+
+  DISALLOW_COPY_AND_ASSIGN(Notification);
+};
+
+#endif
+} // namespace kudu


[kudu] 01/03: [ranger] pass 'principal' and 'keytab' to the subprocess

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 802623741e163333e20ad57e059f4ce20701973e
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Wed Mar 11 16:57:23 2020 -0700

    [ranger] pass 'principal' and 'keytab' to the subprocess
    
    This patch adds the C++ side change to pass the Kudu principal and
    keytab to the Java Ranger subprocess.
    
    Change-Id: Ie30b835b6d44ddb51d95c587f1329bfefebeb37c
    Reviewed-on: http://gerrit.cloudera.org:8080/15416
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Hao Hao <ha...@cloudera.com>
---
 src/kudu/ranger/CMakeLists.txt   |  3 +-
 src/kudu/ranger/ranger_client.cc | 90 ++++++++++++++++++++++++++++------------
 src/kudu/ranger/ranger_client.h  | 18 ++++----
 src/kudu/security/init.cc        | 54 ++++++++++++++----------
 src/kudu/security/init.h         |  6 +++
 src/kudu/server/server_base.cc   | 18 +-------
 6 files changed, 113 insertions(+), 76 deletions(-)

diff --git a/src/kudu/ranger/CMakeLists.txt b/src/kudu/ranger/CMakeLists.txt
index d735f33..f486a19 100644
--- a/src/kudu/ranger/CMakeLists.txt
+++ b/src/kudu/ranger/CMakeLists.txt
@@ -41,7 +41,8 @@ set(RANGER_SRCS
 set(RANGER_DEPS
   gflags
   kudu_subprocess
-  ranger_proto)
+  ranger_proto
+  security)
 
 add_library(kudu_ranger ${RANGER_SRCS})
 target_link_libraries(kudu_ranger ${RANGER_DEPS})
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 7899b6e..73e17d2 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -31,6 +31,7 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/ranger/ranger.pb.h"
+#include "kudu/security/init.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flag_validators.h"
@@ -117,6 +118,9 @@ METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms,
     kudu::MetricLevel::kInfo,
     60000LU, 1);
 
+DECLARE_string(keytab_file);
+DECLARE_string(principal);
+
 namespace kudu {
 namespace ranger {
 
@@ -127,13 +131,50 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+static const char* kUnauthorizedAction = "Unauthorized action";
+static const char* kDenyNonRangerTableTemplate = "Denying action on table with invalid name $0. "
+                                                 "Use 'kudu table rename_table' to rename it to "
+                                                 "a Ranger-compatible name.";
+static const char* kMainClass = "org.apache.kudu.subprocess.ranger.RangerSubprocessMain";
+
 // Returns the path to the JAR file containing the Ranger subprocess.
 static string GetRangerJarPath() {
-  string exe;
-  CHECK_OK(Env::Default()->GetExecutablePath(&exe));
-  const string bin_dir = DirName(exe);
-  return FLAGS_ranger_jar_path.empty() ? JoinPathSegments(bin_dir, "kudu-subprocess.jar") :
-         FLAGS_ranger_jar_path;
+  if (FLAGS_ranger_jar_path.empty()) {
+    string exe;
+    CHECK_OK(Env::Default()->GetExecutablePath(&exe));
+    const string bin_dir = DirName(exe);
+    return JoinPathSegments(bin_dir, "kudu-subprocess.jar");
+  }
+  return FLAGS_ranger_jar_path;
+}
+
+// Returns the classpath to be used for the Ranger subprocess.
+static string GetJavaClasspath() {
+  return Substitute("$0:$1", GetRangerJarPath(), FLAGS_ranger_config_path);
+}
+
+// Builds the arguments for the Ranger subprocess. Specifically pass
+// the principal and keytab file that the Ranger subprocess will log in with
+// if Kerberos is enabled. 'args' has the final arguments.
+// Returns 'OK' if arguments successfully created, error otherwise.
+static Status BuildArgv(vector<string>* argv) {
+  DCHECK(argv);
+  // Pass the required arguments to run the Ranger subprocess.
+  vector<string> ret = { FLAGS_ranger_java_path, "-cp", GetJavaClasspath(), kMainClass };
+  // When Kerberos is enabled in Kudu, pass both Kudu principal and keytab file
+  // to the Ranger subprocess.
+  if (!FLAGS_keytab_file.empty()) {
+    string configured_principal;
+    RETURN_NOT_OK_PREPEND(security::GetConfiguredPrincipal(FLAGS_principal, &configured_principal),
+                          "unable to get the configured principal from for the Ranger subprocess");
+    ret.emplace_back("-i");
+    ret.emplace_back(std::move(configured_principal));
+    ret.emplace_back("-k");
+    ret.emplace_back(FLAGS_keytab_file);
+  }
+
+  *argv = std::move(ret);
+  return Status::OK();
 }
 
 static bool ValidateRangerConfiguration() {
@@ -145,14 +186,14 @@ static bool ValidateRangerConfiguration() {
       string p;
       Status s = Subprocess::Call({ "which", FLAGS_ranger_java_path }, "", &p);
       if (!s.ok()) {
-        LOG(ERROR) << Substitute("FLAGS_ranger_java_path has invalid java binary path: $0",
+        LOG(ERROR) << Substitute("--ranger_java_path has invalid java binary path: $0",
                                  FLAGS_ranger_java_path);
         return false;
       }
     }
     string ranger_jar_path = GetRangerJarPath();
     if (!Env::Default()->FileExists(ranger_jar_path)) {
-      LOG(ERROR) << Substitute("FLAGS_ranger_jar_path has invalid JAR file path: $0",
+      LOG(ERROR) << Substitute("--ranger_jar_path has invalid JAR file path: $0",
                                ranger_jar_path);
       return false;
     }
@@ -161,12 +202,6 @@ static bool ValidateRangerConfiguration() {
 }
 GROUP_FLAG_VALIDATOR(ranger_config_flags, ValidateRangerConfiguration);
 
-static const char* kUnauthorizedAction = "Unauthorized action";
-static const char* kDenyNonRangerTableTemplate = "Denying action on table with invalid name $0. "
-                                                 "Use 'kudu table rename_table' to rename it to "
-                                                 "a Ranger-compatible name.";
-const char* kMainClass = "org.apache.kudu.subprocess.ranger.RangerSubprocessMain";
-
 #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
 RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
   HISTINIT(sp_inbound_queue_length, ranger_subprocess_inbound_queue_length);
@@ -181,19 +216,24 @@ RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntit
 }
 #undef HISTINIT
 
-RangerClient::RangerClient(const scoped_refptr<MetricEntity>& metric_entity) :
-  subprocess_({ FLAGS_ranger_java_path, "-cp", GetJavaClasspath(), kMainClass },
-              metric_entity) {}
+RangerClient::RangerClient(const scoped_refptr<MetricEntity>& metric_entity)
+    : metric_entity_(metric_entity) {
+  DCHECK(metric_entity);
+}
 
 Status RangerClient::Start() {
   VLOG(1) << "Initializing Ranger subprocess server";
-  return subprocess_.Start();
+  vector<string> argv;
+  RETURN_NOT_OK(BuildArgv(&argv));
+  subprocess_.reset(new RangerSubprocess(std::move(argv), metric_entity_));
+  return subprocess_->Start();
 }
 
 // TODO(abukor): refactor to avoid code duplication
 Status RangerClient::AuthorizeAction(const string& user_name,
                                      const ActionPB& action,
                                      const string& table_name) {
+  DCHECK(subprocess_);
   string db;
   Slice tbl;
 
@@ -213,7 +253,7 @@ Status RangerClient::AuthorizeAction(const string& user_name,
   req->set_database(db);
   req->set_table(tbl.ToString());
 
-  RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+  RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
 
   CHECK_EQ(1, resp_list.responses_size());
   if (resp_list.responses().begin()->allowed()) {
@@ -229,6 +269,7 @@ Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
                                                     const ActionPB& action,
                                                     const string& table_name,
                                                     unordered_set<string>* column_names) {
+  DCHECK(subprocess_);
   DCHECK(!column_names->empty());
 
   string db;
@@ -252,7 +293,7 @@ Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
     req->set_column(col);
   }
 
-  RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+  RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
 
   DCHECK_EQ(column_names->size(), resp_list.responses_size());
 
@@ -277,6 +318,7 @@ Status RangerClient::AuthorizeActionMultipleColumns(const string& user_name,
 Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
                                                    const ActionPB& action,
                                                    unordered_set<string>* table_names) {
+  DCHECK(subprocess_);
   if (table_names->empty()) {
     return Status::InvalidArgument("Empty set of tables");
   }
@@ -304,7 +346,7 @@ Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
     }
   }
 
-  RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+  RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
 
   DCHECK_EQ(orig_table_names.size(), resp_list.responses_size());
 
@@ -329,6 +371,7 @@ Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
 Status RangerClient::AuthorizeActions(const string& user_name,
                                       const string& table_name,
                                       unordered_set<ActionPB, ActionHash>* actions) {
+  DCHECK(subprocess_);
   DCHECK(!actions->empty());
 
   string db;
@@ -351,7 +394,7 @@ Status RangerClient::AuthorizeActions(const string& user_name,
     req->set_table(tbl.ToString());
   }
 
-  RETURN_NOT_OK(subprocess_.Execute(req_list, &resp_list));
+  RETURN_NOT_OK(subprocess_->Execute(req_list, &resp_list));
 
   DCHECK_EQ(actions->size(), resp_list.responses_size());
 
@@ -372,10 +415,5 @@ Status RangerClient::AuthorizeActions(const string& user_name,
 
   return Status::OK();
 }
-
-string RangerClient::GetJavaClasspath() {
-  return Substitute("$0:$1", GetRangerJarPath(), FLAGS_ranger_config_path);
-}
-
 } // namespace ranger
 } // namespace kudu
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
index c1f091e..d6e920b 100644
--- a/src/kudu/ranger/ranger_client.h
+++ b/src/kudu/ranger/ranger_client.h
@@ -27,12 +27,11 @@
 #include "kudu/ranger/ranger.pb.h"
 #include "kudu/subprocess/server.h"
 #include "kudu/subprocess/subprocess_proxy.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
-class MetricEntity;
-
 namespace ranger {
 
 struct ActionHash {
@@ -91,17 +90,16 @@ class RangerClient {
 
   // Replaces the subprocess server in the subprocess proxy.
   void ReplaceServerForTests(std::unique_ptr<subprocess::SubprocessServer> server) {
-    subprocess_.ReplaceServerForTests(std::move(server));
+    // Creates a dummy RangerSubprocess if it is not initialized.
+    if (!subprocess_) {
+      subprocess_.reset(new RangerSubprocess({""}, metric_entity_));
+    }
+    subprocess_->ReplaceServerForTests(std::move(server));
   }
 
  private:
-  // Sends request to the subprocess and parses the response.
-  Status SendRequest(RangerRequestListPB* req, RangerResponseListPB* resp) WARN_UNUSED_RESULT;
-
-  // Returns classpath to be used for the Ranger subprocess.
-  static std::string GetJavaClasspath();
-
-  RangerSubprocess subprocess_;
+  std::unique_ptr<RangerSubprocess> subprocess_;
+  scoped_refptr<MetricEntity> metric_entity_;
 };
 
 } // namespace ranger
diff --git a/src/kudu/security/init.cc b/src/kudu/security/init.cc
index e740fcb..1268386 100644
--- a/src/kudu/security/init.cc
+++ b/src/kudu/security/init.cc
@@ -73,6 +73,21 @@ DEFINE_bool(use_system_auth_to_local, kDefaultSystemAuthToLocal,
             "'kudu/foo.example.com@EXAMPLE' will map to 'kudu'.");
 TAG_FLAG(use_system_auth_to_local, advanced);
 
+DEFINE_string(principal, "kudu/_HOST",
+              "Kerberos principal that this daemon will log in as. The special token "
+              "_HOST will be replaced with the FQDN of the local host.");
+TAG_FLAG(principal, experimental);
+// This is currently tagged as unsafe because there is no way for users to configure
+// clients to expect a non-default principal. As such, configuring a server to login
+// as a different one would end up with a cluster that can't be connected to.
+// See KUDU-1884.
+TAG_FLAG(principal, unsafe);
+
+DEFINE_string(keytab_file, "",
+              "Path to the Kerberos Keytab file for this server. Specifying a "
+              "keytab file will cause the server to kinit, and enable Kerberos "
+              "to be used to authenticate RPC connections.");
+TAG_FLAG(keytab_file, stable);
 
 using std::mt19937;
 using std::random_device;
@@ -363,29 +378,6 @@ Status KinitContext::KinitInternal() {
   return Status::OK();
 }
 
-namespace {
-// 'in_principal' is the user specified principal to use with Kerberos. It may have a token
-// in the string of the form '_HOST', which if present, needs to be replaced with the FQDN of the
-// current host.
-// 'out_principal' has the final principal with which one may Kinit.
-Status GetConfiguredPrincipal(const std::string& in_principal, string* out_principal) {
-  *out_principal = in_principal;
-  const auto& kHostToken = "_HOST";
-  if (in_principal.find(kHostToken) != string::npos) {
-    string hostname;
-    // Try to fill in either the FQDN or hostname.
-    if (!GetFQDN(&hostname).ok()) {
-      RETURN_NOT_OK(GetHostname(&hostname));
-    }
-    // Hosts in principal names are canonicalized to lower-case.
-    std::transform(hostname.begin(), hostname.end(), hostname.begin(), tolower);
-    GlobalReplaceSubstring(kHostToken, hostname, out_principal);
-  }
-  return Status::OK();
-}
-} // anonymous namespace
-
-
 RWMutex* KerberosReinitLock() {
   return g_kerberos_reinit_lock;
 }
@@ -444,6 +436,22 @@ Status MapPrincipalToLocalName(const std::string& principal, std::string* local_
   return Status::OK();
 }
 
+Status GetConfiguredPrincipal(const string& in_principal, string* out_principal) {
+  *out_principal = in_principal;
+  static const auto& kHostToken = "_HOST";
+  if (in_principal.find(kHostToken) != string::npos) {
+    string hostname;
+    // Try to fill in either the FQDN or hostname.
+    if (!GetFQDN(&hostname).ok()) {
+      RETURN_NOT_OK(GetHostname(&hostname));
+    }
+    // Hosts in principal names are canonicalized to lower-case.
+    std::transform(hostname.begin(), hostname.end(), hostname.begin(), tolower);
+    GlobalReplaceSubstring(kHostToken, hostname, out_principal);
+  }
+  return Status::OK();
+}
+
 boost::optional<string> GetLoggedInPrincipalFromKeytab() {
   if (!g_kinit_ctx) return boost::none;
   return g_kinit_ctx->principal_str();
diff --git a/src/kudu/security/init.h b/src/kudu/security/init.h
index 80074b3..31dba47 100644
--- a/src/kudu/security/init.h
+++ b/src/kudu/security/init.h
@@ -86,5 +86,11 @@ Status CanonicalizeKrb5Principal(std::string* principal);
 // exist yet, and trying to avoid rebase pain).
 Status MapPrincipalToLocalName(const std::string& principal, std::string* local_name);
 
+// Get the configured principal. 'in_principal' is the user specified principal to use with
+// Kerberos. It may have a token in the string of the form '_HOST', which if present, needs
+// to be replaced with the FQDN of the current host. 'out_principal' has the final principal
+// with which one may Kinit.
+Status GetConfiguredPrincipal(const std::string& in_principal, std::string* out_principal);
+
 } // namespace security
 } // namespace kudu
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 828b3b2..754c5a5 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -126,22 +126,6 @@ DEFINE_string(user_acl, "*",
 TAG_FLAG(user_acl, stable);
 TAG_FLAG(user_acl, sensitive);
 
-DEFINE_string(principal, "kudu/_HOST",
-              "Kerberos principal that this daemon will log in as. The special token "
-              "_HOST will be replaced with the FQDN of the local host.");
-TAG_FLAG(principal, experimental);
-// This is currently tagged as unsafe because there is no way for users to configure
-// clients to expect a non-default principal. As such, configuring a server to login
-// as a different one would end up with a cluster that can't be connected to.
-// See KUDU-1884.
-TAG_FLAG(principal, unsafe);
-
-DEFINE_string(keytab_file, "",
-              "Path to the Kerberos Keytab file for this server. Specifying a "
-              "keytab file will cause the server to kinit, and enable Kerberos "
-              "to be used to authenticate RPC connections.");
-TAG_FLAG(keytab_file, stable);
-
 DEFINE_bool(allow_world_readable_credentials, false,
             "Enable the use of keytab files and TLS private keys with "
             "world-readable permissions.");
@@ -226,6 +210,8 @@ DECLARE_int32(dns_resolver_max_threads_num);
 DECLARE_uint32(dns_resolver_cache_capacity_mb);
 DECLARE_uint32(dns_resolver_cache_ttl_sec);
 DECLARE_string(log_filename);
+DECLARE_string(keytab_file);
+DECLARE_string(principal);
 
 METRIC_DECLARE_gauge_size(merged_entities_count_of_server);