You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by fr...@apache.org on 2019/07/02 14:02:18 UTC

[impala] 01/03: IMPALA-6159 / KUDU-2192: Enable TCP keepalive for all outbound connections

This is an automated email from the ASF dual-hosted git repository.

fredyw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7206b52e5b6ab1df045e4249d859129d32aacf6b
Author: Michael Ho <kw...@cloudera.com>
AuthorDate: Thu Jun 20 18:34:02 2019 -0700

    IMPALA-6159 / KUDU-2192: Enable TCP keepalive for all outbound connections
    
    This change enables TCP keepalive for all outbound connections.
    This aims to handle cases in which the remote peer may have
    dropped off the network without sending a TCP RST. For instance,
    a remote host could have hit a kernel panic and got power cycled.
    In which case, the existing TCP connection to that host may be
    stale. In an idle cluster, this stale connection may not be detected
    until the next use of it, in which case it will result in a RPC
    failure due to TCP RST sent from the restarted peer.
    
    By enabling TCP keepalive, we ensure that stale TCP connections
    in an idle cluster will be detected and closed within a time bound
    so a new connection will be created on the next use. This change
    introduces 3 different flags:
    
    --tcp_keepalive_probe_period_s: the duration in seconds a TCP connection
    has to be idle before keepalive probes started to be sent.
    
    --tcp_keepalive_retry_period_s: the duration in seconds between successive
    keepalive probes if previous probes didn't get an ACK from remote peer.
    
    --tcp_keepalive_retry_count: the maximum number of TCP keepalive probes
    sent without an ACK before declaring the remote peer as dead.
    
    Testing:
    - Used TCP dump to verify that keepalive probes are being sent periodically.
    - Verified that blocking all incoming traffic to a server's port via an iptable
    rule caused the TCP connection to be closed and the keepalive probes to stop
    eventually.
    
    Change-Id: Iaa1d66d83aea1cc82d07fc6217be5fc1306695bc
    Reviewed-on: http://gerrit.cloudera.org:8080/13702
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Todd Lipcon <to...@apache.org>
    Tested-by: Kudu Jenkins
    Reviewed-on: http://gerrit.cloudera.org:8080/13764
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/rpc/connection.cc  |  8 ++++++++
 be/src/kudu/rpc/connection.h   |  7 +++++++
 be/src/kudu/rpc/reactor.cc     | 33 ++++++++++++++++++++++++++++++++-
 be/src/kudu/rpc/reactor.h      |  4 ++++
 be/src/kudu/rpc/rpc-test.cc    | 37 ++++++++++++++++++++++++++++++++++++-
 be/src/kudu/util/net/socket.cc | 18 ++++++++++++++++++
 be/src/kudu/util/net/socket.h  |  7 +++++++
 7 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
index 1632dd3..c9a7576 100644
--- a/be/src/kudu/rpc/connection.cc
+++ b/be/src/kudu/rpc/connection.cc
@@ -82,6 +82,14 @@ Status Connection::SetNonBlocking(bool enabled) {
   return socket_->SetNonBlocking(enabled);
 }
 
+Status Connection::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) {
+  DCHECK_GT(idle_time_s, 0);
+  DCHECK_GE(retry_time_s, 0);
+  DCHECK_GE(num_retries, 0);
+  return socket_->SetTcpKeepAlive(std::max(1, idle_time_s), std::max(0, retry_time_s),
+      std::max(0, num_retries));
+}
+
 void Connection::EpollRegister(ev::loop_ref& loop) {
   DCHECK(reactor_thread_->IsCurrentThread());
   DVLOG(4) << "Registering connection for epoll: " << ToString();
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 362a35b..84b325d 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -98,6 +98,13 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Set underlying socket to non-blocking (or blocking) mode.
   Status SetNonBlocking(bool enabled);
 
+  // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent
+  // to the remote end after the connection has been idle for 'idle_time_s' seconds.
+  // It will retry sending probes up to 'num_retries' number of times until an ACK is
+  // heard from peer. 'retry_time_s' is the sleep time in seconds between successive
+  // keepalive probes.
+  Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries);
+
   // Register our socket with an epoll loop.  We will only ever be registered in
   // one epoll loop at a time.
   void EpollRegister(ev::loop_ref& loop);
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index c1832ef..fd88afb 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -49,6 +49,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/thread_restrictions.h"
@@ -80,6 +81,21 @@ DEFINE_bool(rpc_reopen_outbound_connections, false,
 TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
 TAG_FLAG(rpc_reopen_outbound_connections, runtime);
 
+DEFINE_int32(tcp_keepalive_probe_period_s, 60,
+             "The duration in seconds after an outbound connection has gone idle "
+             "before a TCP keepalive probe is sent to the peer. Set to 0 to disable "
+             "TCP keepalive probes from being sent.");
+DEFINE_int32(tcp_keepalive_retry_period_s, 3,
+             "The duration in seconds between successive keepalive probes from an "
+             "outbound connection if the previous probes are not acknowledged. "
+             "Effective only if --tcp_keepalive_probe_period_s is not 0.");
+DEFINE_int32(tcp_keepalive_retry_count, 10,
+             "The maximum number of keepalive probes sent before declaring the remote "
+             "end as dead. Effective only if --tcp_keepalive_probe_period_s is not 0.");
+TAG_FLAG(tcp_keepalive_probe_period_s, advanced);
+TAG_FLAG(tcp_keepalive_retry_period_s, advanced);
+TAG_FLAG(tcp_keepalive_retry_count, advanced);
+
 METRIC_DEFINE_histogram(server, reactor_load_percent,
                         "Reactor Thread Load Percentage",
                         kudu::MetricUnit::kUnits,
@@ -133,7 +149,8 @@ ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
     connection_keepalive_time_(bld.connection_keepalive_time_),
     coarse_timer_granularity_(bld.coarse_timer_granularity_),
     total_client_conns_cnt_(0),
-    total_server_conns_cnt_(0) {
+    total_server_conns_cnt_(0),
+    rng_(GetRandomSeed32()) {
 
   if (bld.metric_entity_) {
     invoke_us_histogram_ =
@@ -608,6 +625,20 @@ void ReactorThread::CompleteConnectionNegotiation(
     return;
   }
 
+  if (FLAGS_tcp_keepalive_probe_period_s > 0) {
+    // Try spreading out the idle poll period to avoid thundering herd in case connections
+    // are all created at the same time (e.g. after a cluster is restarted).
+    Status keepalive_status = conn->SetTcpKeepAlive(
+        FLAGS_tcp_keepalive_probe_period_s + rng_.Uniform32(4),
+        FLAGS_tcp_keepalive_retry_period_s, FLAGS_tcp_keepalive_retry_count);
+    if (PREDICT_FALSE(!keepalive_status.ok())) {
+      LOG(DFATAL) << "Unable to set TCP keepalive for connection: "
+                  << keepalive_status.ToString();
+      DestroyConnection(conn.get(), keepalive_status, std::move(rpc_error));
+      return;
+    }
+  }
+
   conn->MarkNegotiationComplete();
   conn->EpollRegister(loop_);
 }
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
index ce251c1..e01f71e 100644
--- a/be/src/kudu/rpc/reactor.h
+++ b/be/src/kudu/rpc/reactor.h
@@ -37,6 +37,7 @@
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
@@ -327,6 +328,9 @@ class ReactorThread {
   // started.
   int64_t total_poll_cycles_ = 0;
 
+  // Random number generator for randomizing the TCP keepalive interval.
+  Random rng_;
+
   // Accounting for determining load average in each cycle of TimerHandler.
   struct {
     // The cycle-time at which the load average was last calculated.
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 88b12da..5f2be73 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -75,6 +75,9 @@ METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
 DECLARE_bool(rpc_reopen_outbound_connections);
 DECLARE_int32(rpc_negotiation_inject_delay_ms);
+DECLARE_int32(tcp_keepalive_probe_period_s);
+DECLARE_int32(tcp_keepalive_retry_period_s);
+DECLARE_int32(tcp_keepalive_retry_count);
 
 using std::shared_ptr;
 using std::string;
@@ -766,6 +769,34 @@ TEST_P(TestRpc, TestCallLongerThanKeepalive) {
                                  req, &resp, &controller));
 }
 
+// Test a call which leaves the TCP connection idle for extended period of time
+// and verifies that the call succeeds (i.e. the connection is not closed).
+TEST_P(TestRpc, TestTCPKeepalive) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+
+  // Set up client.
+  FLAGS_tcp_keepalive_probe_period_s = 1;
+  FLAGS_tcp_keepalive_retry_period_s = 1;
+  FLAGS_tcp_keepalive_retry_count = 1;
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, server_addr.host(),
+      GenericCalculatorService::static_service_name());
+
+  // Make a call which sleeps for longer than TCP keepalive probe period,
+  // triggering TCP keepalive probes.
+  RpcController controller;
+  SleepRequestPB req;
+  req.set_sleep_micros(8 * 1000 * 1000); // 8 seconds.
+  req.set_deferred(true);
+  SleepResponsePB resp;
+  ASSERT_OK(p.SyncRequest(GenericCalculatorService::kSleepMethodName,
+      req, &resp, &controller));
+}
+
 // Test that the RpcSidecar transfers the expected messages.
 TEST_P(TestRpc, TestRpcSidecar) {
   // Set up server.
@@ -1052,11 +1083,15 @@ TEST_F(TestRpc, TestServerShutsDown) {
     // EINVAL is possible if the controller socket had already disconnected by
     // the time it trys to set the SO_SNDTIMEO socket option as part of the
     // normal blocking SASL handshake.
+    //
+    // ENOTCONN is possible simply because the server closes the connection
+    // after the connection is established.
     ASSERT_TRUE(s.posix_code() == EPIPE ||
                 s.posix_code() == ECONNRESET ||
                 s.posix_code() == ESHUTDOWN ||
                 s.posix_code() == ECONNREFUSED ||
-                s.posix_code() == EINVAL)
+                s.posix_code() == EINVAL ||
+                s.posix_code() == ENOTCONN)
       << "Unexpected status: " << s.ToString();
   }
 }
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
index bc3b525..e58bb4e 100644
--- a/be/src/kudu/util/net/socket.cc
+++ b/be/src/kudu/util/net/socket.cc
@@ -582,6 +582,24 @@ Status Socket::SetTimeout(int opt, const char* optname, const MonoDelta& timeout
   return Status::OK();
 }
 
+Status Socket::SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries) {
+#if defined(__linux__)
+  static const char* const err_string = "failed to set socket option $0 to $1";
+  DCHECK_GT(idle_time_s, 0);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPIDLE, idle_time_s),
+      Substitute(err_string, "TCP_KEEPIDLE", idle_time_s));
+  DCHECK_GT(retry_time_s, 0);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPINTVL, retry_time_s),
+      Substitute(err_string, "TCP_KEEPINTVL", retry_time_s));
+  DCHECK_GT(num_retries, 0);
+  RETURN_NOT_OK_PREPEND(SetSockOpt(IPPROTO_TCP, TCP_KEEPCNT, num_retries),
+      Substitute(err_string, "TCP_KEEPCNT", num_retries));
+  RETURN_NOT_OK_PREPEND(SetSockOpt(SOL_SOCKET, SO_KEEPALIVE, 1),
+      "failed to enable TCP KeepAlive socket option");
+#endif
+  return Status::OK();
+}
+
 template<typename T>
 Status Socket::SetSockOpt(int level, int option, const T& value) {
   if (::setsockopt(fd_, level, option, &value, sizeof(T)) == -1) {
diff --git a/be/src/kudu/util/net/socket.h b/be/src/kudu/util/net/socket.h
index 992c44a..0f561a7 100644
--- a/be/src/kudu/util/net/socket.h
+++ b/be/src/kudu/util/net/socket.h
@@ -153,6 +153,13 @@ class Socket {
   // See also readn() from Stevens (2004) or Kerrisk (2010)
   Status BlockingRecv(uint8_t *buf, size_t amt, size_t *nread, const MonoTime& deadline);
 
+  // Enable TCP keepalive for the underlying socket. A TCP keepalive probe will be sent
+  // to the remote end after the connection has been idle for 'idle_time_s' seconds.
+  // It will retry sending probes up to 'num_retries' number of times until an ACK is
+  // heard from peer. 'retry_time_s' is the sleep time in seconds between successive
+  // keepalive probes.
+  Status SetTcpKeepAlive(int idle_time_s, int retry_time_s, int num_retries);
+
  private:
   // Called internally from SetSend/RecvTimeout().
   Status SetTimeout(int opt, const char* optname, const MonoDelta& timeout);