You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/04/30 22:57:28 UTC

incubator-kudu git commit: KUDU-1409. Make krpc call timeouts more resistant to process or reactor pauses

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 59ff89dbc -> d0ec5205a


KUDU-1409. Make krpc call timeouts more resistant to process or reactor pauses

In stress testing Impala on Kudu I've seen various RPC timeouts that turn out
to be due to pauses on the client side. In particular, scenarios like
IMPALA-2800[1] can cause the memory allocator inside Impala to block for
several seconds, and that might cause us to think we missed a timeout.

This adds a little workaround in our libev timeout handling code. The
full description of the workaround can be found in connection.cc.

[1] https://issues.cloudera.org/browse/IMPALA-2800

Change-Id: I7bff0bc1573a059f12be8bd3f46e301275e78392
Reviewed-on: http://gerrit.cloudera.org:8080/2745
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d0ec5205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d0ec5205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d0ec5205

Branch: refs/heads/master
Commit: d0ec5205ad47b54c66bc8fa8f3cfd0241abe20b7
Parents: 59ff89d
Author: Todd Lipcon <to...@apache.org>
Authored: Sun Apr 10 20:15:13 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Apr 30 20:57:11 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.cc    | 51 +++++++++++++++++++++++++++++++++++++-
 src/kudu/rpc/connection.h     |  4 +++
 src/kudu/rpc/messenger.h      |  2 ++
 src/kudu/rpc/rpc-test-base.h  |  8 +++---
 src/kudu/rpc/rpc-test.cc      |  9 +++++--
 src/kudu/rpc/rpc_stub-test.cc | 35 ++++++++++++++++++++++++++
 6 files changed, 103 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0ec5205/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 46a92d9..626664e 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -194,6 +194,21 @@ Connection::CallAwaitingResponse::~CallAwaitingResponse() {
 }
 
 void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) {
+  if (remaining_timeout > 0) {
+    if (watcher.remaining() < -1.0) {
+      LOG(WARNING) << "RPC call timeout handler was delayed by "
+                   << -watcher.remaining() << "s! This may be due to a process-wide "
+                   << "pause such as swapping, logging-related delays, or allocator lock "
+                   << "contention. Will allow an additional "
+                   << remaining_timeout << "s for a response.";
+    }
+
+    watcher.set(remaining_timeout, 0);
+    watcher.start();
+    remaining_timeout = 0;
+    return;
+  }
+
   conn->HandleOutboundCallTimeout(this);
 }
 
@@ -287,7 +302,41 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
     reactor_thread_->RegisterTimeout(&car->timeout_timer);
     car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*)
                            &CallAwaitingResponse::HandleTimeout>(car.get());
-    car->timeout_timer.set(timeout.ToSeconds(), 0);
+
+    // For calls with a timeout of at least 500ms, we actually run the timeout
+    // handler in two stages. The first timeout fires with a timeout 10% less
+    // than the user-specified one. It then schedules a second timeout for the
+    // remaining amount of time.
+    //
+    // The purpose of this two-stage timeout is to be more robust when the client
+    // has some process-wide pause, such as lock contention in tcmalloc, or a
+    // reactor callback that blocks in glog. Consider the following case:
+    //
+    // T = 0s        user issues an RPC with 5 second timeout
+    // T = 0.5s - 6s   process is blocked
+    // T = 6s        process unblocks, and the timeout fires (1s late)
+    //
+    // Without the two-stage timeout, we would determine that the call had timed out,
+    // even though it's likely that the response is waiting on our TCP socket.
+    // With the two-stage timeout, we'll end up with:
+    //
+    // T = 0s           user issues an RPC with 5 second timeout
+    // T = 0.5s - 6s    process is blocked
+    // T = 6s           process unblocks, and the first-stage timeout fires (1.5s late)
+    // T = 6s - 6.200s  time for the client to read the response which is waiting
+    // T = 6.200s       if the response was not actually available, we'll time out here
+    //
+    // We don't bother with this logic for calls with very short timeouts - assumedly
+    // a user setting such a short RPC timeout is well equipped to handle one.
+    double time = timeout.ToSeconds();
+    if (time >= 0.5) {
+      car->remaining_timeout = time * 0.1;
+      time -= car->remaining_timeout;
+    } else {
+      car->remaining_timeout = 0;
+    }
+
+    car->timeout_timer.set(time, 0);
     car->timeout_timer.start();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0ec5205/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 3f7e234..d12a168 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -182,6 +182,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
     Connection *conn;
     std::shared_ptr<OutboundCall> call;
     ev::timer timeout_timer;
+
+    // We time out RPC calls in two stages. This is set to the amount of timeout
+    // remaining after the next timeout fires. See Connection::QueueOutboundCall().
+    double remaining_timeout;
   };
 
   typedef std::unordered_map<uint64_t, CallAwaitingResponse*> car_map_t;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0ec5205/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index e43278c..92c183b 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -177,6 +177,8 @@ class Messenger {
 
   RpczStore* rpcz_store() { return rpcz_store_.get(); }
 
+  int num_reactors() const { return reactors_.size(); }
+
   std::string name() const {
     return name_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0ec5205/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 9be3bcb..24bacdf 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -339,7 +339,9 @@ class RpcTestBase : public KuduTest {
   void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) {
     SleepRequestPB req;
     SleepResponsePB resp;
-    req.set_sleep_micros(500000); // 0.5sec
+    // Sleep for 50ms longer than the call timeout.
+    int sleep_micros = timeout.ToMicroseconds() + 50000;
+    req.set_sleep_micros(sleep_micros);
 
     RpcController c;
     c.set_timeout(timeout);
@@ -354,8 +356,8 @@ class RpcTestBase : public KuduTest {
 
     // We shouldn't timeout significantly faster than our configured timeout.
     EXPECT_GE(elapsed_millis, expected_millis - 10);
-    // And we also shouldn't take the full 0.5sec that we asked for
-    EXPECT_LT(elapsed_millis, 500);
+    // And we also shouldn't take the full time that we asked for
+    EXPECT_LT(elapsed_millis * 1000, sleep_micros);
     EXPECT_TRUE(s.IsTimedOut());
     LOG(INFO) << "status: " << s.ToString() << ", seconds elapsed: " << sw.elapsed().wall_seconds();
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0ec5205/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index e1d2c5b..001de38 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -294,8 +294,13 @@ TEST_F(TestRpc, TestCallTimeout) {
   // before.
   ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromNanoseconds(1)));
 
-  // Test a longer timeout - expect this will time out after we send the request.
-  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(10)));
+  // Test a longer timeout - expect this will time out after we send the request,
+  // but shorter than our threshold for two-stage timeout handling.
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(200)));
+
+  // Test a longer timeout - expect this will trigger the "two-stage timeout"
+  // code path.
+  ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1500)));
 }
 
 static void AcceptAndReadForever(Socket* listen_sock) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0ec5205/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 193da0f..00adf1e 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -554,5 +554,40 @@ TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) {
   ASSERT_TRUE(my_refptr->HasOneRef());
 }
 
+// Regression test for KUDU-1409: if the client reactor thread is blocked (e.g due to a
+// process-wide pause or a slow callback) then we should not cause RPC calls to time out.
+TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) {
+  CHECK_EQ(client_messenger_->num_reactors(), 1)
+      << "This test requires only a single reactor. Otherwise the injected sleep might "
+      << "be scheduled on a different reactor than the RPC call.";
+
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  // Schedule a 1-second sleep on the reactor thread.
+  //
+  // This will cause us the reactor to be blocked while the call response is received, and
+  // still be blocked when the timeout would normally occur. Despite this, the call should
+  // not time out.
+  //
+  //  0s         0.5s          1.2s     1.5s
+  //  RPC call running
+  //  |---------------------|
+  //              Reactor blocked in sleep
+  //             |----------------------|
+  //                            \_ RPC would normally time out
+
+  client_messenger_->ScheduleOnReactor([](const Status& s) {
+      ThreadRestrictions::ScopedAllowWait allow_wait;
+      SleepFor(MonoDelta::FromSeconds(1));
+    }, MonoDelta::FromSeconds(0.5));
+
+  RpcController controller;
+  SleepRequestPB req;
+  SleepResponsePB resp;
+  req.set_sleep_micros(800 * 1000);
+  controller.set_timeout(MonoDelta::FromMilliseconds(1200));
+  ASSERT_OK(p.Sleep(req, &resp, &controller));
+}
+
 } // namespace rpc
 } // namespace kudu