You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/14 15:33:05 UTC

[kudu] branch master updated: rpc: avoid an extra epoll cycle when data can be sent immediately

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 57a4ea3  rpc: avoid an extra epoll cycle when data can be sent immediately
57a4ea3 is described below

commit 57a4ea30931dc140c3cff25623cd0bd664122321
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Sat Apr 11 22:19:14 2020 -0700

    rpc: avoid an extra epoll cycle when data can be sent immediately
    
    Prior to this patch, when queueing outbound data in the reactor thread,
    we would always enqueue the transfer, set the epoll watcher on the
    socket to wait for the "writable" state, and then go back into the epoll
    loop. In most cases, there is sufficient buffer space, so the next epoll
    loop would return immediately. We'd then perform the write, see nothing
    left in the transfer queue, and disable the "write" watcher again.
    
    This whole dance would result in a sequence of epoll_ctl / epoll_wait /
    send / epoll_ctl for every outbound message.
    
    This patch changes the behavior so that, when we enqueue a transfer, if
    the transfer queue was previously empty (ie we hadn't gotten blocked on
    another transfer), we now try writing it immediately. Only if it fails
    to fully write do we enable the epoll watching. This saves a pair of
    epoll_ctl calls and an epoll_wait for both the request and response
    sides of every RPC.
    
    I tested this using a new benchmark I'm working on which sends 20480
    synchronous RPCs from a single thread. I used 'perf trace' to count
    syscalls:
    
    Without the patch:
      Performance counter stats for 'build/latest/bin/rpc_stub-test
      --gtest_filter=*Trans* --benchmark_total_mb=40960
      --benchmark_method_pattern=fd*mmap*reuse*reuse)
      -benchmark-setup-pattern=all-same-*':
    
                  81,924      syscalls:sys_enter_epoll_ctl  (4 per RPC)
                 122,886      syscalls:sys_enter_epoll_wait (6 per RPC)
    
    With the patch:
      Performance counter stats for 'build/latest/bin/rpc_stub-test
      --gtest_filter=*Trans* --benchmark_total_mb=40960
      --benchmark_method_pattern=fd*mmap*reuse*reuse)
      -benchmark-setup-pattern=all-same-*':
    
                      6      syscalls:sys_enter_epoll_ctl  (0 per RPC)
                 81,927      syscalls:sys_enter_epoll_wait (4 per RPC)
    
    I also benchmarked with:
      rpc-bench --gtest_filter=\*Calls -client-threads=1 \
        -server-reactors=1 --gtest_repeat=10 2>&1 | grep Reqs/sec
    
    and ran a t-test on the results:
    
      	Welch Two Sample t-test
    
      data:  before and after
      t = -5.5671, df = 12.065, p-value = 0.00012
      alternative hypothesis: true difference in means is not equal to 0
      95 percent confidence interval:
       -2700.794 -1182.046
      sample estimates:
      mean of x mean of y
       25353.48  27294.90
    
    so 95% confidence interval this improves throughput between 5 and 10%.
    
    Change-Id: I30af102224d5db2cb526b4c2ae981d6e9defd82a
    Reviewed-on: http://gerrit.cloudera.org:8080/15716
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/rpc/connection.cc | 26 ++++++++++++++------------
 src/kudu/rpc/connection.h  | 20 ++++++++++++++++++++
 src/kudu/rpc/rpc-test.cc   |  9 ++++++++-
 3 files changed, 42 insertions(+), 13 deletions(-)

diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index a49dbe2..f45f1d3 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -344,10 +344,11 @@ void Connection::QueueOutbound(unique_ptr<OutboundTransfer> transfer) {
   outbound_transfers_.push_back(*transfer.release());
 
   if (negotiation_complete_ && !write_io_.is_active()) {
-    // If we weren't currently in the middle of sending anything,
-    // then our write_io_ interest is stopped. Need to re-start it.
-    // Only do this after connection negotiation is done doing its work.
-    write_io_.start();
+    // Optimistically assume that the socket is writable if we didn't already
+    // have something queued.
+    if (ProcessOutboundTransfers() == kMoreToSend) {
+      write_io_.start();
+    }
   }
 }
 
@@ -746,19 +747,22 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
   }
   DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
 
-  OutboundTransfer *transfer;
   if (outbound_transfers_.empty()) {
     LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is "
       "nothing to write.";
     write_io_.stop();
     return;
   }
+  if (ProcessOutboundTransfers() == kNoMoreToSend) {
+    write_io_.stop();
+  }
+}
 
+Connection::ProcessOutboundTransfersResult Connection::ProcessOutboundTransfers() {
   while (!outbound_transfers_.empty()) {
-    transfer = &(outbound_transfers_.front());
+    OutboundTransfer* transfer = &(outbound_transfers_.front());
 
     if (!transfer->TransferStarted()) {
-
       if (transfer->is_for_outbound_call()) {
         CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
         if (!car->call) {
@@ -801,21 +805,19 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
     if (PREDICT_FALSE(!status.ok())) {
       LOG(WARNING) << ToString() << " send error: " << status.ToString();
       reactor_thread_->DestroyConnection(this, status);
-      return;
+      return kConnectionDestroyed;
     }
 
     if (!transfer->TransferFinished()) {
       DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
-      return;
+      return kMoreToSend;
     }
 
     outbound_transfers_.pop_front();
     delete transfer;
   }
 
-  // If we were able to write all of our outbound transfers,
-  // we don't have any more to write.
-  write_io_.stop();
+  return kNoMoreToSend;
 }
 
 std::string Connection::ToString() const {
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index ab3b029..a77d4d5 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -184,6 +184,26 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // libev callback when we may write to the socket.
   void WriteHandler(ev::io &watcher, int revents);
 
+  enum ProcessOutboundTransfersResult {
+    // All of the transfers in the queue have been sent successfully.
+    // The queue is now empty.
+    kNoMoreToSend,
+    // Not all transfers were able to be sent. The caller should
+    // ensure that write_io_ is enabled in order to continue attempting
+    // to send transfers.
+    kMoreToSend,
+    // An error occurred trying to write to the connection, and the
+    // connection was destroyed. NOTE: 'this' may be deleted if this
+    // value is returned.
+    kConnectionDestroyed
+  };
+
+  // Process any pending outbound transfers in outbound_transfers_.
+  // Result indicates the state of the connection following the attempt.
+  //
+  // NOTE: This may invoke DestroyConnection() on 'this'.
+  ProcessOutboundTransfersResult ProcessOutboundTransfers();
+
   // Safe to be called from other threads.
   std::string ToString() const;
 
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 67b51d8..52aede8 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -27,6 +27,7 @@
 #include <string>
 #include <thread>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -523,12 +524,18 @@ TEST_P(TestRpc, TestClientConnectionMetrics) {
     add_req.set_x(rand());
     add_req.set_y(rand());
     AddResponsePB add_resp;
+    string big_string(8 * 1024 * 1024, 'a');
 
     // Send the calls.
     vector<unique_ptr<RpcController>> controllers;
     CountDownLatch latch(n_calls);
     for (int i = 0; i < n_calls; i++) {
-      controllers.emplace_back(new RpcController());
+      unique_ptr<RpcController> rpc(new RpcController());
+      // Attach a big sidecar so that we are less likely to be able to send the
+      // whole RPC in a single write() call without queueing it.
+      int junk;
+      CHECK_OK(rpc->AddOutboundSidecar(RpcSidecar::FromSlice(big_string), &junk));
+      controllers.emplace_back(std::move(rpc));
       p.AsyncRequest(GenericCalculatorService::kAddMethodName, add_req, &add_resp,
                      controllers.back().get(), [&latch]() { latch.CountDown(); });
     }