You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/20 04:51:53 UTC

[kudu] 02/03: rpc: reduce context switches and receive calls

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1a21e92057f61bd3b0581ac590a84bc7d7a21ac6
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 5 12:49:07 2020 -0800

    rpc: reduce context switches and receive calls
    
    * When queueing a task to execute on the reactor, avoid writing to the
      eventfd to wake it up if such a write has already been done. This
      should reduce the number of read/write syscalls to the eventfd and
      avoid "spurious" wakeups of the reactor.
    
    * When reading inbound data, read an extra 4 bytes, and if it's
      available, loop around to read another call without putting the
      reactor back to sleep.
    
    The effect on context switches is clearly visible using
      rpc-bench --gtest_filter=\*Async
    
    Before:
    I0305 12:50:56.463312  7468 rpc-bench.cc:128] Ctx Sw. per req:  0.640409
    I0305 12:50:58.015260  7542 rpc-bench.cc:128] Ctx Sw. per req:  0.613172
    I0305 12:50:59.563201  7587 rpc-bench.cc:128] Ctx Sw. per req:  0.589479
    I0305 12:51:01.014848  7662 rpc-bench.cc:128] Ctx Sw. per req:  0.562744
    I0305 12:51:02.666339  7736 rpc-bench.cc:128] Ctx Sw. per req:  0.569126
    
    After:
    I0305 12:52:03.567790  9005 rpc-bench.cc:128] Ctx Sw. per req:  0.383251
    I0305 12:52:05.050909  9079 rpc-bench.cc:128] Ctx Sw. per req:  0.454404
    I0305 12:52:06.626401  9138 rpc-bench.cc:128] Ctx Sw. per req:  0.3308
    I0305 12:52:08.123154  9198 rpc-bench.cc:128] Ctx Sw. per req:  0.317752
    I0305 12:52:09.666586  9272 rpc-bench.cc:128] Ctx Sw. per req:  0.391739
    
    And on system CPU:
    
    Before:
    I0305 12:50:56.463310  7468 rpc-bench.cc:127] Sys CPU per req:  16.5524us
    I0305 12:50:58.015259  7542 rpc-bench.cc:127] Sys CPU per req:  16.1158us
    I0305 12:50:59.563199  7587 rpc-bench.cc:127] Sys CPU per req:  17.3184us
    I0305 12:51:01.014847  7662 rpc-bench.cc:127] Sys CPU per req:  16.7911us
    I0305 12:51:02.666337  7736 rpc-bench.cc:127] Sys CPU per req:  15.7659us
    
    After:
    I0305 12:52:03.567787  9005 rpc-bench.cc:127] Sys CPU per req:  13.0533us
    I0305 12:52:05.050906  9079 rpc-bench.cc:127] Sys CPU per req:  13.7925us
    I0305 12:52:06.626399  9138 rpc-bench.cc:127] Sys CPU per req:  11.6987us
    I0305 12:52:08.123152  9198 rpc-bench.cc:127] Sys CPU per req:  11.9214us
    I0305 12:52:09.666584  9272 rpc-bench.cc:127] Sys CPU per req:  13.4031us
    
    And on syscalls:
    
    todd@turbo:~/kudu$ grep recvfr /tmp/before /tmp/after
    /tmp/before:           1458969      syscalls:sys_enter_recvfrom                                     ( +-  1.99% )
    /tmp/before:           1458969      syscalls:sys_exit_recvfrom                                     ( +-  1.99% )
    /tmp/after:           1252328      syscalls:sys_enter_recvfrom                                     ( +-  1.82% )
    /tmp/after:           1252328      syscalls:sys_exit_recvfrom                                     ( +-  1.82% )
    
    todd@turbo:~/kudu$ grep epoll_ctl /tmp/before /tmp/after
    /tmp/before:            915862      syscalls:sys_enter_epoll_ctl                                     ( +-  1.47% )
    /tmp/before:            915862      syscalls:sys_exit_epoll_ctl                                     ( +-  1.47% )
    /tmp/after:            475978      syscalls:sys_enter_epoll_ctl                                     ( +-  3.61% )
    /tmp/after:            475978      syscalls:sys_exit_epoll_ctl                                     ( +-  3.61% )
    
    On a more macro-benchmark (TSBS single-groupby-1-1-1 16 workers on an
    8-core machine) this also reduces syscalls a bit, though the end-to-end
    improvement is minimal.
    
    Before:
    
     Performance counter stats for 'system wide' (10 runs):
    
               340,444      cs                                                            ( +-  0.30% )
               144,024      syscalls:sys_enter_recvfrom                                     ( +-  0.00% )
                94,379      syscalls:sys_enter_epoll_ctl                                     ( +-  0.06% )
               129,376      syscalls:sys_enter_epoll_wait                                     ( +-  0.10% )
    
           2.025755946 seconds time elapsed                                          ( +-  0.43% )
    
    After:
     Performance counter stats for 'system wide' (10 runs):
    
               333,865      cs                                                            ( +-  0.27% )
               119,216      syscalls:sys_enter_recvfrom                                     ( +-  0.04% )
                88,731      syscalls:sys_enter_epoll_ctl                                     ( +-  0.08% )
               104,149      syscalls:sys_enter_epoll_wait                                     ( +-  0.08% )
    
           2.005614271 seconds time elapsed                                          ( +-  0.19% )
    
    Change-Id: I32c5e4d146c25be8e90665a0cb8385fcd017b15c
    Reviewed-on: http://gerrit.cloudera.org:8080/15440
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
---
 src/kudu/rpc/connection.cc | 17 ++++++-----
 src/kudu/rpc/reactor.cc    |  6 +++-
 src/kudu/rpc/transfer.cc   | 70 +++++++++++++++++++++++++++++++---------------
 src/kudu/rpc/transfer.h    | 11 ++++++--
 4 files changed, 69 insertions(+), 35 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 7dc0c03..a49dbe2 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -43,6 +43,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/slice.h"
@@ -645,11 +646,12 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
   }
   last_activity_time_ = reactor_thread_->cur_time();
 
+  faststring extra_buf;
   while (true) {
     if (!inbound_) {
       inbound_.reset(new InboundTransfer());
     }
-    Status status = inbound_->ReceiveBuffer(*socket_);
+    Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
     if (PREDICT_FALSE(!status.ok())) {
       if (status.posix_code() == ESHUTDOWN) {
         VLOG(1) << ToString() << " shut down by remote end.";
@@ -673,14 +675,11 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
       LOG(FATAL) << "Invalid direction: " << direction_;
     }
 
-    // TODO: it would seem that it would be good to loop around and see if
-    // there is more data on the socket by trying another recv(), but it turns
-    // out that it really hurts throughput to do so. A better approach
-    // might be for each InboundTransfer to actually try to read an extra byte,
-    // and if it succeeds, then we'd copy that byte into a new InboundTransfer
-    // and loop around, since it's likely the next call also arrived at the
-    // same time.
-    break;
+    if (extra_buf.size() > 0) {
+      inbound_.reset(new InboundTransfer(std::move(extra_buf)));
+    } else {
+      break;
+    }
   }
 }
 
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 231783b..e77488c 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -928,6 +928,7 @@ void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
 }
 
 void Reactor::ScheduleReactorTask(ReactorTask *task) {
+  bool was_empty;
   {
     std::unique_lock<LockType> l(lock_);
     if (closing_) {
@@ -936,9 +937,12 @@ void Reactor::ScheduleReactorTask(ReactorTask *task) {
       task->Abort(ShutdownError(false));
       return;
     }
+    was_empty = pending_tasks_.empty();
     pending_tasks_.push_back(*task);
   }
-  thread_.WakeThread();
+  if (was_empty) {
+    thread_.WakeThread();
+  }
 }
 
 bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index fb6d6a2..7693a91 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -20,6 +20,7 @@
 #include <sys/uio.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <iostream>
 #include <limits>
@@ -88,32 +89,43 @@ TransferCallbacks::~TransferCallbacks()
 {}
 
 InboundTransfer::InboundTransfer()
-  : total_length_(kMsgLengthPrefixLength),
+  : total_length_(0),
     cur_offset_(0) {
   buf_.resize(kMsgLengthPrefixLength);
 }
 
-Status InboundTransfer::ReceiveBuffer(Socket &socket) {
-  if (cur_offset_ < kMsgLengthPrefixLength) {
-    // receive uint32 length prefix
-    int32_t rem = kMsgLengthPrefixLength - cur_offset_;
-    int32_t nread;
-    Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
-    RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
-    if (nread == 0) {
-      return Status::OK();
-    }
-    DCHECK_GE(nread, 0);
-    cur_offset_ += nread;
+InboundTransfer::InboundTransfer(faststring initial_buf)
+  : buf_(std::move(initial_buf)),
+    total_length_(0),
+    cur_offset_(buf_.size()) {
+  buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) {
+  static constexpr int kExtraReadLength = kMsgLengthPrefixLength;
+  if (total_length_ == 0) {
+    // We haven't yet parsed the message length. It's possible that the
+    // length is already available in the buffer passed in the constructor.
     if (cur_offset_ < kMsgLengthPrefixLength) {
-      // If we still don't have the full length prefix, we can't continue
-      // reading yet.
-      return Status::OK();
+      // receive uint32 length prefix
+      int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+      int32_t nread;
+      Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
+      RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+      if (nread == 0) {
+        return Status::OK();
+      }
+      DCHECK_GE(nread, 0);
+      cur_offset_ += nread;
+      if (cur_offset_ < kMsgLengthPrefixLength) {
+        // If we still don't have the full length prefix, we can't continue
+        // reading yet.
+        return Status::OK();
+      }
     }
-    // Since we only read 'rem' bytes above, we should now have exactly
-    // the length prefix in our buffer and no more.
-    DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
 
+    // Parse the message length out of the prefix.
+    DCHECK_GE(cur_offset_, kMsgLengthPrefixLength);
     // The length prefix doesn't include its own 4 bytes, so we have to
     // add that back in.
     total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
@@ -126,7 +138,7 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
       return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
                                              total_length_));
     }
-    buf_.resize(total_length_);
+    buf_.resize(total_length_ + kExtraReadLength);
 
     // Fall through to receive the message body, which is likely to be already
     // available on the socket.
@@ -139,12 +151,24 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
   // INT_MAX. The message will be split across multiple Recv() calls.
   // Note that this is only needed when rpc_max_message_size > INT_MAX, which is
   // currently only used for unit tests.
-  int32_t rem = std::min(total_length_ - cur_offset_,
+  int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength,
       static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
-  Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+  Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
   RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
   cur_offset_ += nread;
 
+  // We may have read some extra bytes, in which case we need to trim them off
+  // and write them into the provided buffer.
+  if (cur_offset_ >= total_length_) {
+    int64_t extra_read = cur_offset_ - total_length_;
+    DCHECK_LE(extra_read, kExtraReadLength);
+    DCHECK_GE(extra_read, 0);
+    extra_4->clear();
+    extra_4->append(&buf_[total_length_], extra_read);
+    cur_offset_ = total_length_;
+    buf_.resize(total_length_);
+  }
+
   return Status::OK();
 }
 
@@ -153,7 +177,7 @@ bool InboundTransfer::TransferStarted() const {
 }
 
 bool InboundTransfer::TransferFinished() const {
-  return cur_offset_ == total_length_;
+  return total_length_ > 0 && cur_offset_ == total_length_;
 }
 
 string InboundTransfer::StatusAsString() const {
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 0628f2c..3cab6b1 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -67,9 +67,15 @@ class InboundTransfer {
  public:
 
   InboundTransfer();
+  explicit InboundTransfer(faststring initial_buf);
 
-  // read from the socket into our buffer
-  Status ReceiveBuffer(Socket &socket);
+  // Read from the socket into our buffer.
+  //
+  // If this is the last read of the transfer (i.e. if TransferFinished() is true
+  // after this call returns OK), up to 4 extra bytes may have been read
+  // from the socket and stored in 'extra_4'. In that case, any previous content of
+  // 'extra_4' is replaced by this extra bytes.
+  Status ReceiveBuffer(Socket *socket, faststring* extra_4);
 
   // Return true if any bytes have yet been sent.
   bool TransferStarted() const;
@@ -91,6 +97,7 @@ class InboundTransfer {
 
   faststring buf_;
 
+  // 0 indicates not yet set
   uint32_t total_length_;
   uint32_t cur_offset_;