You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/04/26 20:25:51 UTC
kudu git commit: [rpc] reopen_outbound_connections mode for reactor
Repository: kudu
Updated Branches:
refs/heads/master df9da6a84 -> ec10fd365
[rpc] reopen_outbound_connections mode for reactor
Introduced a test mode for the RPC ReactorThread: reopen already
existing idle outbound connection upon making another remote call
to the same server. This is a groundwork for follow-up patches which
contain tests requiring connection negotiation to be done upon every
RPC call.
Added a couple of new ReactorThread metrics to count total number of
server- and client-side connections open during ReactorThread lifetime.
Added unit test to cover the newly introduced functionality. Also, did
a minor clean-up on the ReactorThread code and its inline documentation.
Change-Id: I71ada660d8c92de1813a261e221f73017c2c764a
Reviewed-on: http://gerrit.cloudera.org:8080/6710
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ec10fd36
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ec10fd36
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ec10fd36
Branch: refs/heads/master
Commit: ec10fd36585a5553a943823e8144a3078aa1a61f
Parents: df9da6a
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Apr 20 23:07:32 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Apr 26 20:24:56 2017 +0000
----------------------------------------------------------------------
src/kudu/rpc/connection.h | 1 -
src/kudu/rpc/messenger.h | 1 +
src/kudu/rpc/reactor.cc | 64 +++++++++++++++++++++++++++++-------------
src/kudu/rpc/reactor.h | 17 ++++++++---
src/kudu/rpc/rpc-test.cc | 43 ++++++++++++++++++++++++++++
5 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index e475cc5..e165c29 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -111,7 +111,6 @@ class Connection : public RefCountedThreadSafe<Connection> {
// Queue a new call to be made. If the queueing fails, the call will be
// marked failed.
// Takes ownership of the 'call' object regardless of whether it succeeds or fails.
- // This may be called from a non-reactor thread.
void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
// Queue a call response back to the client on the server side.
http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 93e3626..7f2974d 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -248,6 +248,7 @@ class Messenger {
private:
FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+ FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
explicit Messenger(const MessengerBuilder &bld);
http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 1a74aaf..6d6a5d4 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -73,6 +73,17 @@ DEFINE_int64(rpc_negotiation_timeout_ms, 15000,
TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
+DEFINE_bool(rpc_reopen_outbound_connections, false,
+ "Open a new connection to the server for every RPC call, "
+ "if possible. If not enabled, an already existing connection to a "
+ "server is reused upon making another call to the same server. "
+ "When this flag is enabled, an already existing _idle_ connection "
+ "to the server is closed upon making another RPC call which would "
+ "reuse the connection otherwise. "
+ "Used by tests only.");
+TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
+TAG_FLAG(rpc_reopen_outbound_connections, runtime);
+
namespace kudu {
namespace rpc {
@@ -91,7 +102,9 @@ ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
last_unused_tcp_scan_(cur_time_),
reactor_(reactor),
connection_keepalive_time_(bld.connection_keepalive_time_),
- coarse_timer_granularity_(bld.coarse_timer_granularity_) {
+ coarse_timer_granularity_(bld.coarse_timer_granularity_),
+ total_client_conns_cnt_(0),
+ total_server_conns_cnt_(0) {
}
Status ReactorThread::Init() {
@@ -127,17 +140,16 @@ void ReactorThread::ShutdownInternal() {
// Tear down any outbound TCP connections.
Status service_unavailable = ShutdownError(false);
VLOG(1) << name() << ": tearing down outbound TCP connections...";
- for (auto c = client_conns_.begin(); c != client_conns_.end();
- c = client_conns_.begin()) {
- const scoped_refptr<Connection>& conn = (*c).second;
+ for (const auto& elem : client_conns_) {
+ const auto& conn = elem.second;
VLOG(1) << name() << ": shutting down " << conn->ToString();
conn->Shutdown(service_unavailable);
- client_conns_.erase(c);
}
+ client_conns_.clear();
// Tear down any inbound TCP connections.
VLOG(1) << name() << ": tearing down inbound TCP connections...";
- for (const scoped_refptr<Connection>& conn : server_conns_) {
+ for (const auto& conn : server_conns_) {
VLOG(1) << name() << ": shutting down " << conn->ToString();
conn->Shutdown(service_unavailable);
}
@@ -162,10 +174,12 @@ ReactorTask::ReactorTask() {
ReactorTask::~ReactorTask() {
}
-Status ReactorThread::GetMetrics(ReactorMetrics *metrics) {
+Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
DCHECK(IsCurrentThread());
metrics->num_client_connections_ = client_conns_.size();
metrics->num_server_connections_ = server_conns_.size();
+ metrics->total_client_connections_ = total_client_conns_cnt_;
+ metrics->total_server_connections_ = total_server_conns_cnt_;
return Status::OK();
}
@@ -222,6 +236,7 @@ void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
DestroyConnection(conn.get(), s);
return;
}
+ ++total_server_conns_cnt_;
server_conns_.emplace_back(std::move(conn));
}
@@ -252,8 +267,7 @@ void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
"the timer handler.";
return;
}
- MonoTime now(MonoTime::Now());
- cur_time_ = now;
+ cur_time_ = MonoTime::Now();
ScanIdleConnections();
}
@@ -327,10 +341,23 @@ void ReactorThread::RunThread() {
Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
scoped_refptr<Connection>* conn) {
DCHECK(IsCurrentThread());
- conn_map_t::const_iterator c = client_conns_.find(conn_id);
- if (c != client_conns_.end()) {
- *conn = (*c).second;
- return Status::OK();
+ conn_map_t::const_iterator it = client_conns_.find(conn_id);
+ if (it != client_conns_.end()) {
+ const auto& c = it->second;
+ // Regular mode: reuse the connection to the same server.
+ if (PREDICT_TRUE(!FLAGS_rpc_reopen_outbound_connections)) {
+ *conn = c;
+ return Status::OK();
+ }
+
+ // Kind of 'one-connection-per-RPC' mode: reopen the idle connection.
+ if (!c->Idle()) {
+ *conn = c;
+ return Status::OK();
+ }
+ DCHECK_EQ(Connection::CLIENT, c->direction());
+ c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+ client_conns_.erase(it);
}
// No connection to this remote. Need to create one.
@@ -340,8 +367,7 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
// Create a new socket and start connecting to the remote.
Socket sock;
RETURN_NOT_OK(CreateClientSocket(&sock));
- bool connect_in_progress;
- RETURN_NOT_OK(StartConnect(&sock, conn_id.remote(), &connect_in_progress));
+ RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
std::unique_ptr<Socket> new_socket(new Socket(sock.Release()));
@@ -361,6 +387,8 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
// Insert into the client connection map to avoid duplicate connection requests.
client_conns_.insert(conn_map_t::value_type(conn_id, *conn));
+ ++total_client_conns_cnt_;
+
return Status::OK();
}
@@ -412,18 +440,16 @@ Status ReactorThread::CreateClientSocket(Socket *sock) {
return ret;
}
-Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote, bool *in_progress) {
- Status ret = sock->Connect(remote);
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) {
+ const Status ret = sock->Connect(remote);
if (ret.ok()) {
VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
- *in_progress = false; // connect() finished immediately.
return Status::OK();
}
int posix_code = ret.posix_code();
if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
- *in_progress = true; // The connect operation is in progress.
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index a31392b..f31f69d 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -58,6 +58,11 @@ struct ReactorMetrics {
int32_t num_client_connections_;
// Number of server RPC connections currently connected.
int32_t num_server_connections_;
+
+ // Total number of client RPC connections opened during Reactor's lifetime.
+ uint64_t total_client_connections_;
+ // Total number of server RPC connections opened during Reactor's lifetime.
+ uint64_t total_server_connections_;
};
// A task which can be enqueued to run on the reactor thread.
@@ -136,7 +141,6 @@ class ReactorThread {
Status Init();
// Add any connections on this reactor thread into the given status dump.
- // May be called from another thread.
Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
DumpRunningRpcsResponsePB* resp);
@@ -213,9 +217,8 @@ class ReactorThread {
// Create a new client socket (non-blocking, NODELAY)
static Status CreateClientSocket(Socket *sock);
- // Initiate a new connection on the given socket, setting *in_progress
- // to true if the connection is still pending upon return.
- static Status StartConnect(Socket *sock, const Sockaddr &remote, bool *in_progress);
+ // Initiate a new connection on the given socket.
+ static Status StartConnect(Socket *sock, const Sockaddr &remote);
// Assign a new outbound call to the appropriate connection object.
// If this fails, the call is marked failed and completed.
@@ -264,6 +267,12 @@ class ReactorThread {
// Scan for idle connections on this granularity.
const MonoDelta coarse_timer_granularity_;
+
+ // Total number of client connections opened during Reactor's lifetime.
+ uint64_t total_client_conns_cnt_;
+
+ // Total number of server connections opened during Reactor's lifetime.
+ uint64_t total_server_conns_cnt_;
};
// A Reactor manages a ReactorThread
http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 675915b..aca9324 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -38,6 +38,7 @@
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
+DECLARE_bool(rpc_reopen_outbound_connections);
DECLARE_int32(rpc_negotiation_inject_delay_ms);
using std::shared_ptr;
@@ -248,6 +249,48 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections";
}
+// Test that outbound connections to the same server are reopen upon every RPC
+// call when the 'rpc_reopen_outbound_connections' flag is set.
+TEST_P(TestRpc, TestReopenOutboundConnections) {
+ // Set the flag to enable special mode: close and reopen already established
+ // outbound connections.
+ FLAGS_rpc_reopen_outbound_connections = true;
+
+ // Only run one reactor per messenger, so we can grab the metrics from that
+ // one without having to check all.
+ n_server_reactor_threads_ = 1;
+
+ // Set up server.
+ Sockaddr server_addr;
+ bool enable_ssl = GetParam();
+ StartTestServer(&server_addr, enable_ssl);
+
+ // Set up client.
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+ Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+ // Verify the initial counters.
+ ReactorMetrics metrics;
+ ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(0, metrics.total_client_connections_);
+ ASSERT_EQ(0, metrics.total_server_connections_);
+ ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(0, metrics.total_client_connections_);
+ ASSERT_EQ(0, metrics.total_server_connections_);
+
+ // Run several iterations, just in case.
+ for (int i = 0; i < 32; ++i) {
+ ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+ ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(0, metrics.total_client_connections_);
+ ASSERT_EQ(i + 1, metrics.total_server_connections_);
+ ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(i + 1, metrics.total_client_connections_);
+ ASSERT_EQ(0, metrics.total_server_connections_);
+ }
+}
+
// Test that a call which takes longer than the keepalive time
// succeeds -- i.e that we don't consider a connection to be "idle" on the
// server if there is a call outstanding on it.