You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2017/12/27 16:44:19 UTC
impala git commit: KUDU-2237: Allow idle server connection scanning
to be disabled
Repository: impala
Updated Branches:
refs/heads/master abab176e2 -> fac2b2cd9
KUDU-2237: Allow idle server connection scanning to be disabled
Currently, a server connection being idle for more than
FLAGS_rpc_default_keepalive_time_ms ms will be closed.
However, certain services (e.g. Impala) using KRPC may want to
keep the idle connections alive for various reasons (e.g. sheer
number of connections to re-establish, negotiation overhead
in a secure cluster). To avoid idle connection from being
closed, one currently have to set FLAGS_rpc_default_keepalive_time_ms
to a very large value.
This change implements a cleaner solution by disabling idle
connection scanning if FLAGS_rpc_default_keepalive_time_ms is
set to any negative value. This avoids the unnecessary
overhead of scanning for idle server connections and alleviates
the user from having to pick a random large number to make sure
the connection is always kept alive.
Change-Id: I6161b9e753f05620784565a417d248acf8e7050a
Reviewed-on: http://gerrit.cloudera.org:8080/8831
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/8909
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fac2b2cd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fac2b2cd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fac2b2cd
Branch: refs/heads/master
Commit: fac2b2cd9b365b8637806ec9eb64e7f03912c24c
Parents: abab176
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Dec 13 14:30:25 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Dec 23 01:34:23 2017 +0000
----------------------------------------------------------------------
be/src/kudu/rpc/messenger.h | 1 +
be/src/kudu/rpc/reactor.cc | 40 ++++++++++++++++++-----------------
be/src/kudu/rpc/reactor.h | 3 ++-
be/src/kudu/rpc/rpc-test-base.h | 15 +++++++------
be/src/kudu/rpc/rpc-test.cc | 41 ++++++++++++++++++++++++++++++++++++
5 files changed, 73 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index 80d8a2f..8756842 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -260,6 +260,7 @@ class Messenger {
private:
FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+ FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
FRIEND_TEST(TestRpc, TestCredentialsPolicy);
FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index 313a08b..e6a216c 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -308,28 +308,30 @@ void ReactorThread::ScanIdleConnections() {
// Enforce TCP connection timeouts: server-side connections.
const auto server_conns_end = server_conns_.end();
uint64_t timed_out = 0;
- for (auto it = server_conns_.begin(); it != server_conns_end; ) {
- Connection* conn = it->get();
- if (!conn->Idle()) {
- VLOG(10) << "Connection " << conn->ToString() << " not idle";
- ++it;
- continue;
- }
+ // Scan for idle server connections if it's enabled.
+ if (connection_keepalive_time_ >= MonoDelta::FromMilliseconds(0)) {
+ for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+ Connection* conn = it->get();
+ if (!conn->Idle()) {
+ VLOG(10) << "Connection " << conn->ToString() << " not idle";
+ ++it;
+ continue;
+ }
- const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
- if (connection_delta <= connection_keepalive_time_) {
- ++it;
- continue;
- }
+ const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+ if (connection_delta <= connection_keepalive_time_) {
+ ++it;
+ continue;
+ }
- conn->Shutdown(Status::NetworkError(
- Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
- VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
- << connection_delta.ToString();
- ++timed_out;
- it = server_conns_.erase(it);
+ conn->Shutdown(Status::NetworkError(
+ Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+ VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
+ << connection_delta.ToString();
+ ++timed_out;
+ it = server_conns_.erase(it);
+ }
}
-
// Take care of idle client-side connections marked for shutdown.
uint64_t shutdown = 0;
for (auto it = client_conns_.begin(); it != client_conns_.end();) {
http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
index 2c1bfc8..7efbe94 100644
--- a/be/src/kudu/rpc/reactor.h
+++ b/be/src/kudu/rpc/reactor.h
@@ -227,7 +227,8 @@ class ReactorThread {
std::unique_ptr<ErrorStatusPB> rpc_error = {});
// Scan any open connections for idle ones that have been idle longer than
- // connection_keepalive_time_
+ // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan
+ // is skipped.
void ScanIdleConnections();
// Create a new client socket (non-blocking, NODELAY)
http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index c8c84ca..ca7ccc4 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -437,13 +437,14 @@ class RpcTestBase : public KuduTest {
}
bld.set_num_reactors(n_reactors);
- bld.set_connection_keepalive_time(
- MonoDelta::FromMilliseconds(keepalive_time_ms_));
- // In order for the keepalive timing to be accurate, we need to scan connections
- // significantly more frequently than the keepalive time. This "coarse timer"
- // granularity determines this.
- bld.set_coarse_timer_granularity(MonoDelta::FromMilliseconds(
- std::min(keepalive_time_ms_ / 5, 100)));
+ bld.set_connection_keepalive_time(MonoDelta::FromMilliseconds(keepalive_time_ms_));
+ if (keepalive_time_ms_ >= 0) {
+ // In order for the keepalive timing to be accurate, we need to scan connections
+ // significantly more frequently than the keepalive time. This "coarse timer"
+ // granularity determines this.
+ bld.set_coarse_timer_granularity(
+ MonoDelta::FromMilliseconds(std::min(keepalive_time_ms_ / 5, 100)));
+ }
bld.set_metric_entity(metric_entity_);
std::shared_ptr<Messenger> messenger;
CHECK_OK(bld.Build(&messenger));
http://git-wip-us.apache.org/repos/asf/impala/blob/fac2b2cd/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 598a202..c97423b 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -363,6 +363,47 @@ TEST_P(TestRpc, TestConnectionKeepalive) {
ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections";
}
+// Test that idle connection is kept alive when 'keepalive_time_ms_' is set to -1.
+TEST_P(TestRpc, TestConnectionAlwaysKeepalive) {
+ // Only run one reactor per messenger, so we can grab the metrics from that
+ // one without having to check all.
+ n_server_reactor_threads_ = 1;
+ keepalive_time_ms_ = -1;
+
+ // Set up server.
+ Sockaddr server_addr;
+ bool enable_ssl = GetParam();
+ StartTestServer(&server_addr, enable_ssl);
+
+ // Set up client.
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+ Proxy p(client_messenger, server_addr, server_addr.host(),
+ GenericCalculatorService::static_service_name());
+
+ ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+
+ ReactorMetrics metrics;
+ ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connection";
+ ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
+
+ ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
+ ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
+
+ SleepFor(MonoDelta::FromSeconds(3));
+
+ // After sleeping, the connection should still be alive.
+ ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connections";
+ ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
+
+ ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
+ ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
+ ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
+}
+
// Test that outbound connections to the same server are reopen upon every RPC
// call when the 'rpc_reopen_outbound_connections' flag is set.
TEST_P(TestRpc, TestReopenOutboundConnections) {