You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/03/20 04:51:53 UTC
[kudu] 02/03: rpc: reduce context switches and receive calls
This is an automated email from the ASF dual-hosted git repository.
todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1a21e92057f61bd3b0581ac590a84bc7d7a21ac6
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 5 12:49:07 2020 -0800
rpc: reduce context switches and receive calls
* When queueing a task to execute on the reactor, avoid writing to the
eventfd to wake it up if such a write has already been done. This
should reduce the number of read/write syscalls to the eventfd and
avoid "spurious" wakeups of the reactor.
* When reading inbound data, read an extra 4 bytes, and if it's
available, loop around to read another call without putting the
reactor back to sleep.
The effect on context switches is clearly visible using
rpc-bench --gtest_filter=\*Async
Before:
I0305 12:50:56.463312 7468 rpc-bench.cc:128] Ctx Sw. per req: 0.640409
I0305 12:50:58.015260 7542 rpc-bench.cc:128] Ctx Sw. per req: 0.613172
I0305 12:50:59.563201 7587 rpc-bench.cc:128] Ctx Sw. per req: 0.589479
I0305 12:51:01.014848 7662 rpc-bench.cc:128] Ctx Sw. per req: 0.562744
I0305 12:51:02.666339 7736 rpc-bench.cc:128] Ctx Sw. per req: 0.569126
After:
I0305 12:52:03.567790 9005 rpc-bench.cc:128] Ctx Sw. per req: 0.383251
I0305 12:52:05.050909 9079 rpc-bench.cc:128] Ctx Sw. per req: 0.454404
I0305 12:52:06.626401 9138 rpc-bench.cc:128] Ctx Sw. per req: 0.3308
I0305 12:52:08.123154 9198 rpc-bench.cc:128] Ctx Sw. per req: 0.317752
I0305 12:52:09.666586 9272 rpc-bench.cc:128] Ctx Sw. per req: 0.391739
And on system CPU:
Before:
I0305 12:50:56.463310 7468 rpc-bench.cc:127] Sys CPU per req: 16.5524us
I0305 12:50:58.015259 7542 rpc-bench.cc:127] Sys CPU per req: 16.1158us
I0305 12:50:59.563199 7587 rpc-bench.cc:127] Sys CPU per req: 17.3184us
I0305 12:51:01.014847 7662 rpc-bench.cc:127] Sys CPU per req: 16.7911us
I0305 12:51:02.666337 7736 rpc-bench.cc:127] Sys CPU per req: 15.7659us
After:
I0305 12:52:03.567787 9005 rpc-bench.cc:127] Sys CPU per req: 13.0533us
I0305 12:52:05.050906 9079 rpc-bench.cc:127] Sys CPU per req: 13.7925us
I0305 12:52:06.626399 9138 rpc-bench.cc:127] Sys CPU per req: 11.6987us
I0305 12:52:08.123152 9198 rpc-bench.cc:127] Sys CPU per req: 11.9214us
I0305 12:52:09.666584 9272 rpc-bench.cc:127] Sys CPU per req: 13.4031us
And on syscalls:
todd@turbo:~/kudu$ grep recvfr /tmp/before /tmp/after
/tmp/before: 1458969 syscalls:sys_enter_recvfrom ( +- 1.99% )
/tmp/before: 1458969 syscalls:sys_exit_recvfrom ( +- 1.99% )
/tmp/after: 1252328 syscalls:sys_enter_recvfrom ( +- 1.82% )
/tmp/after: 1252328 syscalls:sys_exit_recvfrom ( +- 1.82% )
todd@turbo:~/kudu$ grep epoll_ctl /tmp/before /tmp/after
/tmp/before: 915862 syscalls:sys_enter_epoll_ctl ( +- 1.47% )
/tmp/before: 915862 syscalls:sys_exit_epoll_ctl ( +- 1.47% )
/tmp/after: 475978 syscalls:sys_enter_epoll_ctl ( +- 3.61% )
/tmp/after: 475978 syscalls:sys_exit_epoll_ctl ( +- 3.61% )
On a more macro-benchmark (TSBS single-groupby-1-1-1 16 workers on an
8-core machine) this also reduces syscalls a bit, though the end-to-end
improvement is minimal.
Before:
Performance counter stats for 'system wide' (10 runs):
340,444 cs ( +- 0.30% )
144,024 syscalls:sys_enter_recvfrom ( +- 0.00% )
94,379 syscalls:sys_enter_epoll_ctl ( +- 0.06% )
129,376 syscalls:sys_enter_epoll_wait ( +- 0.10% )
2.025755946 seconds time elapsed ( +- 0.43% )
After:
Performance counter stats for 'system wide' (10 runs):
333,865 cs ( +- 0.27% )
119,216 syscalls:sys_enter_recvfrom ( +- 0.04% )
88,731 syscalls:sys_enter_epoll_ctl ( +- 0.08% )
104,149 syscalls:sys_enter_epoll_wait ( +- 0.08% )
2.005614271 seconds time elapsed ( +- 0.19% )
Change-Id: I32c5e4d146c25be8e90665a0cb8385fcd017b15c
Reviewed-on: http://gerrit.cloudera.org:8080/15440
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
Reviewed-by: Bankim Bhavsar <ba...@cloudera.com>
---
src/kudu/rpc/connection.cc | 17 ++++++-----
src/kudu/rpc/reactor.cc | 6 +++-
src/kudu/rpc/transfer.cc | 70 +++++++++++++++++++++++++++++++---------------
src/kudu/rpc/transfer.h | 11 ++++++--
4 files changed, 69 insertions(+), 35 deletions(-)
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 7dc0c03..a49dbe2 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -43,6 +43,7 @@
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/slice.h"
@@ -645,11 +646,12 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
}
last_activity_time_ = reactor_thread_->cur_time();
+ faststring extra_buf;
while (true) {
if (!inbound_) {
inbound_.reset(new InboundTransfer());
}
- Status status = inbound_->ReceiveBuffer(*socket_);
+ Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
if (PREDICT_FALSE(!status.ok())) {
if (status.posix_code() == ESHUTDOWN) {
VLOG(1) << ToString() << " shut down by remote end.";
@@ -673,14 +675,11 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
LOG(FATAL) << "Invalid direction: " << direction_;
}
- // TODO: it would seem that it would be good to loop around and see if
- // there is more data on the socket by trying another recv(), but it turns
- // out that it really hurts throughput to do so. A better approach
- // might be for each InboundTransfer to actually try to read an extra byte,
- // and if it succeeds, then we'd copy that byte into a new InboundTransfer
- // and loop around, since it's likely the next call also arrived at the
- // same time.
- break;
+ if (extra_buf.size() > 0) {
+ inbound_.reset(new InboundTransfer(std::move(extra_buf)));
+ } else {
+ break;
+ }
}
}
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 231783b..e77488c 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -928,6 +928,7 @@ void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
}
void Reactor::ScheduleReactorTask(ReactorTask *task) {
+ bool was_empty;
{
std::unique_lock<LockType> l(lock_);
if (closing_) {
@@ -936,9 +937,12 @@ void Reactor::ScheduleReactorTask(ReactorTask *task) {
task->Abort(ShutdownError(false));
return;
}
+ was_empty = pending_tasks_.empty();
pending_tasks_.push_back(*task);
}
- thread_.WakeThread();
+ if (was_empty) {
+ thread_.WakeThread();
+ }
}
bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index fb6d6a2..7693a91 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -20,6 +20,7 @@
#include <sys/uio.h>
#include <algorithm>
+#include <cstddef>
#include <cstdint>
#include <iostream>
#include <limits>
@@ -88,32 +89,43 @@ TransferCallbacks::~TransferCallbacks()
{}
InboundTransfer::InboundTransfer()
- : total_length_(kMsgLengthPrefixLength),
+ : total_length_(0),
cur_offset_(0) {
buf_.resize(kMsgLengthPrefixLength);
}
-Status InboundTransfer::ReceiveBuffer(Socket &socket) {
- if (cur_offset_ < kMsgLengthPrefixLength) {
- // receive uint32 length prefix
- int32_t rem = kMsgLengthPrefixLength - cur_offset_;
- int32_t nread;
- Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
- RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
- if (nread == 0) {
- return Status::OK();
- }
- DCHECK_GE(nread, 0);
- cur_offset_ += nread;
+InboundTransfer::InboundTransfer(faststring initial_buf)
+ : buf_(std::move(initial_buf)),
+ total_length_(0),
+ cur_offset_(buf_.size()) {
+ buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) {
+ static constexpr int kExtraReadLength = kMsgLengthPrefixLength;
+ if (total_length_ == 0) {
+ // We haven't yet parsed the message length. It's possible that the
+ // length is already available in the buffer passed in the constructor.
if (cur_offset_ < kMsgLengthPrefixLength) {
- // If we still don't have the full length prefix, we can't continue
- // reading yet.
- return Status::OK();
+ // receive uint32 length prefix
+ int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+ int32_t nread;
+ Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+ if (nread == 0) {
+ return Status::OK();
+ }
+ DCHECK_GE(nread, 0);
+ cur_offset_ += nread;
+ if (cur_offset_ < kMsgLengthPrefixLength) {
+ // If we still don't have the full length prefix, we can't continue
+ // reading yet.
+ return Status::OK();
+ }
}
- // Since we only read 'rem' bytes above, we should now have exactly
- // the length prefix in our buffer and no more.
- DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
+ // Parse the message length out of the prefix.
+ DCHECK_GE(cur_offset_, kMsgLengthPrefixLength);
// The length prefix doesn't include its own 4 bytes, so we have to
// add that back in.
total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
@@ -126,7 +138,7 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
total_length_));
}
- buf_.resize(total_length_);
+ buf_.resize(total_length_ + kExtraReadLength);
// Fall through to receive the message body, which is likely to be already
// available on the socket.
@@ -139,12 +151,24 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
// INT_MAX. The message will be split across multiple Recv() calls.
// Note that this is only needed when rpc_max_message_size > INT_MAX, which is
// currently only used for unit tests.
- int32_t rem = std::min(total_length_ - cur_offset_,
+ int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength,
static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
- Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+ Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
cur_offset_ += nread;
+ // We may have read some extra bytes, in which case we need to trim them off
+ // and write them into the provided buffer.
+ if (cur_offset_ >= total_length_) {
+ int64_t extra_read = cur_offset_ - total_length_;
+ DCHECK_LE(extra_read, kExtraReadLength);
+ DCHECK_GE(extra_read, 0);
+ extra_4->clear();
+ extra_4->append(&buf_[total_length_], extra_read);
+ cur_offset_ = total_length_;
+ buf_.resize(total_length_);
+ }
+
return Status::OK();
}
@@ -153,7 +177,7 @@ bool InboundTransfer::TransferStarted() const {
}
bool InboundTransfer::TransferFinished() const {
- return cur_offset_ == total_length_;
+ return total_length_ > 0 && cur_offset_ == total_length_;
}
string InboundTransfer::StatusAsString() const {
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 0628f2c..3cab6b1 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -67,9 +67,15 @@ class InboundTransfer {
public:
InboundTransfer();
+ explicit InboundTransfer(faststring initial_buf);
- // read from the socket into our buffer
- Status ReceiveBuffer(Socket &socket);
+ // Read from the socket into our buffer.
+ //
+ // If this is the last read of the transfer (i.e. if TransferFinished() is true
+ // after this call returns OK), up to 4 extra bytes may have been read
+ // from the socket and stored in 'extra_4'. In that case, any previous content of
+ // 'extra_4' is replaced by this extra bytes.
+ Status ReceiveBuffer(Socket *socket, faststring* extra_4);
// Return true if any bytes have yet been sent.
bool TransferStarted() const;
@@ -91,6 +97,7 @@ class InboundTransfer {
faststring buf_;
+ // 0 indicates not yet set
uint32_t total_length_;
uint32_t cur_offset_;