You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/12/27 16:44:19 UTC

impala git commit: KUDU-2237: Allow idle server connection scanning to be disabled

Repository: impala
Updated Branches:
  refs/heads/master abab176e2 -> fac2b2cd9


KUDU-2237: Allow idle server connection scanning to be disabled

Currently, a server connection being idle for more than
FLAGS_rpc_default_keepalive_time_ms ms will be closed.
However, certain services (e.g. Impala) using KRPC may want to
keep the idle connections alive for various reasons (e.g. sheer
number of connections to re-establish,  negotiation overhead
in a secure cluster). To avoid idle connection from being
closed, one currently have to set FLAGS_rpc_default_keepalive_time_ms
to a very large value.

This change implements a cleaner solution by disabling idle
connection scanning if FLAGS_rpc_default_keepalive_time_ms is
set to any negative value. This avoids the unnecessary
overhead of scanning for idle server connections and alleviates
the user from having to pick a random large number to make sure
the connection is always kept alive.

Change-Id: I6161b9e753f05620784565a417d248acf8e7050a
Reviewed-on: http://gerrit.cloudera.org:8080/8831
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/8909
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fac2b2cd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fac2b2cd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fac2b2cd

Branch: refs/heads/master
Commit: fac2b2cd9b365b8637806ec9eb64e7f03912c24c
Parents: abab176
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Dec 13 14:30:25 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Dec 23 01:34:23 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/messenger.h     |  1 +
 be/src/kudu/rpc/reactor.cc      | 40 ++++++++++++++++++-----------------
 be/src/kudu/rpc/reactor.h       |  3 ++-
 be/src/kudu/rpc/rpc-test-base.h | 15 +++++++------
 be/src/kudu/rpc/rpc-test.cc     | 41 ++++++++++++++++++++++++++++++++++++
 5 files changed, 73 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index 80d8a2f..8756842 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -260,6 +260,7 @@ class Messenger {
 
  private:
   FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+  FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
   FRIEND_TEST(TestRpc, TestCredentialsPolicy);
   FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index 313a08b..e6a216c 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -308,28 +308,30 @@ void ReactorThread::ScanIdleConnections() {
   // Enforce TCP connection timeouts: server-side connections.
   const auto server_conns_end = server_conns_.end();
   uint64_t timed_out = 0;
-  for (auto it = server_conns_.begin(); it != server_conns_end; ) {
-    Connection* conn = it->get();
-    if (!conn->Idle()) {
-      VLOG(10) << "Connection " << conn->ToString() << " not idle";
-      ++it;
-      continue;
-    }
+  // Scan for idle server connections if it's enabled.
+  if (connection_keepalive_time_ >= MonoDelta::FromMilliseconds(0)) {
+    for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+      Connection* conn = it->get();
+      if (!conn->Idle()) {
+        VLOG(10) << "Connection " << conn->ToString() << " not idle";
+        ++it;
+        continue;
+      }
 
-    const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
-    if (connection_delta <= connection_keepalive_time_) {
-      ++it;
-      continue;
-    }
+      const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+      if (connection_delta <= connection_keepalive_time_) {
+        ++it;
+        continue;
+      }
 
-    conn->Shutdown(Status::NetworkError(
-        Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
-    VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
-            << connection_delta.ToString();
-    ++timed_out;
-    it = server_conns_.erase(it);
+      conn->Shutdown(Status::NetworkError(
+          Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+      VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
+              << connection_delta.ToString();
+      ++timed_out;
+      it = server_conns_.erase(it);
+    }
   }
-
   // Take care of idle client-side connections marked for shutdown.
   uint64_t shutdown = 0;
   for (auto it = client_conns_.begin(); it != client_conns_.end();) {

http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
index 2c1bfc8..7efbe94 100644
--- a/be/src/kudu/rpc/reactor.h
+++ b/be/src/kudu/rpc/reactor.h
@@ -227,7 +227,8 @@ class ReactorThread {
                          std::unique_ptr<ErrorStatusPB> rpc_error = {});
 
   // Scan any open connections for idle ones that have been idle longer than
-  // connection_keepalive_time_
+  // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan
+  // is skipped.
   void ScanIdleConnections();
 
   // Create a new client socket (non-blocking, NODELAY)

http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index c8c84ca..ca7ccc4 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -437,13 +437,14 @@ class RpcTestBase : public KuduTest {
     }
 
     bld.set_num_reactors(n_reactors);
-    bld.set_connection_keepalive_time(
-      MonoDelta::FromMilliseconds(keepalive_time_ms_));
-    // In order for the keepalive timing to be accurate, we need to scan connections
-    // significantly more frequently than the keepalive time. This "coarse timer"
-    // granularity determines this.
-    bld.set_coarse_timer_granularity(MonoDelta::FromMilliseconds(
-                                       std::min(keepalive_time_ms_ / 5, 100)));
+    bld.set_connection_keepalive_time(MonoDelta::FromMilliseconds(keepalive_time_ms_));
+    if (keepalive_time_ms_ >= 0) {
+      // In order for the keepalive timing to be accurate, we need to scan connections
+      // significantly more frequently than the keepalive time. This "coarse timer"
+      // granularity determines this.
+      bld.set_coarse_timer_granularity(
+          MonoDelta::FromMilliseconds(std::min(keepalive_time_ms_ / 5, 100)));
+    }
     bld.set_metric_entity(metric_entity_);
     std::shared_ptr<Messenger> messenger;
     CHECK_OK(bld.Build(&messenger));

http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 598a202..c97423b 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -363,6 +363,47 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
   ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections";
 }
 
+// Test that idle connection is kept alive when 'keepalive_time_ms_' is set to -1.
+TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
+  // Only run one reactor per messenger, so we can grab the metrics from that
+  // one without having to check all.
+  n_server_reactor_threads_ = 1;
+  keepalive_time_ms_ = -1;
+
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+          GenericCalculatorService::static_service_name());
+
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+
+  ReactorMetrics metrics;
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connection";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
+
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
+  ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
+
+  SleepFor(MonoDelta::FromSeconds(3));
+
+  // After sleeping, the connection should still be alive.
+  ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connections";
+  ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
+
+  ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+  ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
+  ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
+}
+
 // Test that outbound connections to the same server are reopen upon every RPC
 // call when the 'rpc_reopen_outbound_connections' flag is set.
 TEST_P(TestRpc, TestReopenOutboundConnections) {