You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/04/26 20:25:51 UTC

kudu git commit: [rpc] reopen_outbound_connections mode for reactor

Repository: kudu
Updated Branches:
  refs/heads/master df9da6a84 -> ec10fd365


[rpc] reopen_outbound_connections mode for reactor

Introduced a test mode for the RPC ReactorThread: reopen already
existing idle outbound connection upon making another remote call
to the same server.  This is a groundwork for follow-up patches which
contain tests requiring connection negotiation to be done upon every
RPC call.

Added a couple of new ReactorThread metrics to count total number of
server- and client-side connections open during ReactorThread lifetime.

Added unit test to cover the newly introduced functionality.  Also, did
a minor clean-up on the ReactorThread code and its inline documentation.

Change-Id: I71ada660d8c92de1813a261e221f73017c2c764a
Reviewed-on: http://gerrit.cloudera.org:8080/6710
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ec10fd36
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ec10fd36
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ec10fd36

Branch: refs/heads/master
Commit: ec10fd36585a5553a943823e8144a3078aa1a61f
Parents: df9da6a
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Apr 20 23:07:32 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Apr 26 20:24:56 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.h |  1 -
 src/kudu/rpc/messenger.h  |  1 +
 src/kudu/rpc/reactor.cc   | 64 +++++++++++++++++++++++++++++-------------
 src/kudu/rpc/reactor.h    | 17 ++++++++---
 src/kudu/rpc/rpc-test.cc  | 43 ++++++++++++++++++++++++++++
 5 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index e475cc5..e165c29 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -111,7 +111,6 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Queue a new call to be made. If the queueing fails, the call will be
   // marked failed.
   // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
-  // This may be called from a non-reactor thread.
   void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
   // Queue a call response back to the client on the server side.

http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 93e3626..7f2974d 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -248,6 +248,7 @@ class Messenger {
 
  private:
   FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+  FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 
   explicit Messenger(const MessengerBuilder &bld);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 1a74aaf..6d6a5d4 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -73,6 +73,17 @@ DEFINE_int64(rpc_negotiation_timeout_ms, 15000,
 TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
 TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
 
+DEFINE_bool(rpc_reopen_outbound_connections, false,
+            "Open a new connection to the server for every RPC call, "
+            "if possible. If not enabled, an already existing connection to a "
+            "server is reused upon making another call to the same server. "
+            "When this flag is enabled, an already existing _idle_ connection "
+            "to the server is closed upon making another RPC call which would "
+            "reuse the connection otherwise. "
+            "Used by tests only.");
+TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
+TAG_FLAG(rpc_reopen_outbound_connections, runtime);
+
 namespace kudu {
 namespace rpc {
 
@@ -91,7 +102,9 @@ ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
     last_unused_tcp_scan_(cur_time_),
     reactor_(reactor),
     connection_keepalive_time_(bld.connection_keepalive_time_),
-    coarse_timer_granularity_(bld.coarse_timer_granularity_) {
+    coarse_timer_granularity_(bld.coarse_timer_granularity_),
+    total_client_conns_cnt_(0),
+    total_server_conns_cnt_(0) {
 }
 
 Status ReactorThread::Init() {
@@ -127,17 +140,16 @@ void ReactorThread::ShutdownInternal() {
   // Tear down any outbound TCP connections.
   Status service_unavailable = ShutdownError(false);
   VLOG(1) << name() << ": tearing down outbound TCP connections...";
-  for (auto c = client_conns_.begin(); c != client_conns_.end();
-       c = client_conns_.begin()) {
-    const scoped_refptr<Connection>& conn = (*c).second;
+  for (const auto& elem : client_conns_) {
+    const auto& conn = elem.second;
     VLOG(1) << name() << ": shutting down " << conn->ToString();
     conn->Shutdown(service_unavailable);
-    client_conns_.erase(c);
   }
+  client_conns_.clear();
 
   // Tear down any inbound TCP connections.
   VLOG(1) << name() << ": tearing down inbound TCP connections...";
-  for (const scoped_refptr<Connection>& conn : server_conns_) {
+  for (const auto& conn : server_conns_) {
     VLOG(1) << name() << ": shutting down " << conn->ToString();
     conn->Shutdown(service_unavailable);
   }
@@ -162,10 +174,12 @@ ReactorTask::ReactorTask() {
 ReactorTask::~ReactorTask() {
 }
 
-Status ReactorThread::GetMetrics(ReactorMetrics *metrics) {
+Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
   DCHECK(IsCurrentThread());
   metrics->num_client_connections_ = client_conns_.size();
   metrics->num_server_connections_ = server_conns_.size();
+  metrics->total_client_connections_ = total_client_conns_cnt_;
+  metrics->total_server_connections_ = total_server_conns_cnt_;
   return Status::OK();
 }
 
@@ -222,6 +236,7 @@ void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
     DestroyConnection(conn.get(), s);
     return;
   }
+  ++total_server_conns_cnt_;
   server_conns_.emplace_back(std::move(conn));
 }
 
@@ -252,8 +267,7 @@ void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
       "the timer handler.";
     return;
   }
-  MonoTime now(MonoTime::Now());
-  cur_time_ = now;
+  cur_time_ = MonoTime::Now();
 
   ScanIdleConnections();
 }
@@ -327,10 +341,23 @@ void ReactorThread::RunThread() {
 Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
                                             scoped_refptr<Connection>* conn) {
   DCHECK(IsCurrentThread());
-  conn_map_t::const_iterator c = client_conns_.find(conn_id);
-  if (c != client_conns_.end()) {
-    *conn = (*c).second;
-    return Status::OK();
+  conn_map_t::const_iterator it = client_conns_.find(conn_id);
+  if (it != client_conns_.end()) {
+    const auto& c = it->second;
+    // Regular mode: reuse the connection to the same server.
+    if (PREDICT_TRUE(!FLAGS_rpc_reopen_outbound_connections)) {
+      *conn = c;
+      return Status::OK();
+    }
+
+    // Kind of 'one-connection-per-RPC' mode: reopen the idle connection.
+    if (!c->Idle()) {
+      *conn = c;
+      return Status::OK();
+    }
+    DCHECK_EQ(Connection::CLIENT, c->direction());
+    c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+    client_conns_.erase(it);
   }
 
   // No connection to this remote. Need to create one.
@@ -340,8 +367,7 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
   // Create a new socket and start connecting to the remote.
   Socket sock;
   RETURN_NOT_OK(CreateClientSocket(&sock));
-  bool connect_in_progress;
-  RETURN_NOT_OK(StartConnect(&sock, conn_id.remote(), &connect_in_progress));
+  RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
 
   std::unique_ptr<Socket> new_socket(new Socket(sock.Release()));
 
@@ -361,6 +387,8 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
 
   // Insert into the client connection map to avoid duplicate connection requests.
   client_conns_.insert(conn_map_t::value_type(conn_id, *conn));
+  ++total_client_conns_cnt_;
+
   return Status::OK();
 }
 
@@ -412,18 +440,16 @@ Status ReactorThread::CreateClientSocket(Socket *sock) {
   return ret;
 }
 
-Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote, bool *in_progress) {
-  Status ret = sock->Connect(remote);
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) {
+  const Status ret = sock->Connect(remote);
   if (ret.ok()) {
     VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
-    *in_progress = false; // connect() finished immediately.
     return Status::OK();
   }
 
   int posix_code = ret.posix_code();
   if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
     VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
-    *in_progress = true; // The connect operation is in progress.
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index a31392b..f31f69d 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -58,6 +58,11 @@ struct ReactorMetrics {
   int32_t num_client_connections_;
   // Number of server RPC connections currently connected.
   int32_t num_server_connections_;
+
+  // Total number of client RPC connections opened during Reactor's lifetime.
+  uint64_t total_client_connections_;
+  // Total number of server RPC connections opened during Reactor's lifetime.
+  uint64_t total_server_connections_;
 };
 
 // A task which can be enqueued to run on the reactor thread.
@@ -136,7 +141,6 @@ class ReactorThread {
   Status Init();
 
   // Add any connections on this reactor thread into the given status dump.
-  // May be called from another thread.
   Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
                          DumpRunningRpcsResponsePB* resp);
 
@@ -213,9 +217,8 @@ class ReactorThread {
   // Create a new client socket (non-blocking, NODELAY)
   static Status CreateClientSocket(Socket *sock);
 
-  // Initiate a new connection on the given socket, setting *in_progress
-  // to true if the connection is still pending upon return.
-  static Status StartConnect(Socket *sock, const Sockaddr &remote, bool *in_progress);
+  // Initiate a new connection on the given socket.
+  static Status StartConnect(Socket *sock, const Sockaddr &remote);
 
   // Assign a new outbound call to the appropriate connection object.
   // If this fails, the call is marked failed and completed.
@@ -264,6 +267,12 @@ class ReactorThread {
 
   // Scan for idle connections on this granularity.
   const MonoDelta coarse_timer_granularity_;
+
+  // Total number of client connections opened during Reactor's lifetime.
+  uint64_t total_client_conns_cnt_;
+
+  // Total number of server connections opened during Reactor's lifetime.
+  uint64_t total_server_conns_cnt_;
 };
 
 // A Reactor manages a ReactorThread

http://git-wip-us.apache.org/repos/asf/kudu/blob/ec10fd36/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 675915b..aca9324 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -38,6 +38,7 @@
 METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
 METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
+DECLARE_bool(rpc_reopen_outbound_connections);
 DECLARE_int32(rpc_negotiation_inject_delay_ms);
 
 using std::shared_ptr;
@@ -248,6 +249,48 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
   ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections";
 }
 
+// Test that outbound connections to the same server are reopen upon every RPC
+// call when the 'rpc_reopen_outbound_connections' flag is set.
+TEST_P(TestRpc, TestReopenOutboundConnections) {
+  // Set the flag to enable special mode: close and reopen already established
+  // outbound connections.
+  FLAGS_rpc_reopen_outbound_connections = true;
+
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+  // Verify the initial counters.
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.total_client_connections_);
+  ASSERT_EQ(0, metrics.total_server_connections_);
+
+  // Run several iterations, just in case.
+  for (int i = 0; i < 32; ++i) {
+    ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+    ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+    ASSERT_EQ(0, metrics.total_client_connections_);
+    ASSERT_EQ(i + 1, metrics.total_server_connections_);
+    ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+    ASSERT_EQ(i + 1, metrics.total_client_connections_);
+    ASSERT_EQ(0, metrics.total_server_connections_);
+  }
+}
+
 // Test that a call which takes longer than the keepalive time
 // succeeds -- i.e that we don't consider a connection to be "idle" on the
 // server if there is a call outstanding on it.