You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/18 20:24:59 UTC

kudu git commit: KUDU-2041: Fix negotiation deadlock

Repository: kudu
Updated Branches:
  refs/heads/master c0798a942 -> 677de010e


KUDU-2041: Fix negotiation deadlock

With N threads in the negotiation threadpool, N or more concurrent
client negotiation attempts could starve any incoming server negotiation
tasks which used the same threadpool.

If the set of negotiation attempts forms a graph with a N cycles, the
negotiation could deadlock (at least until the negotiation timeout
expires) as all nodes in the system wait for a server request to
complete, but all nodes have dedicated all their resources to client
requests.

Fix: split the server and client tasks into two separate pools.

Testing: add a unit test which reproduces the issue, and passes with the
fix applied.

Change-Id: I38379eeaf7516d432708c2a2a285839f96c86d4f
Reviewed-on: http://gerrit.cloudera.org:8080/7177
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/677de010
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/677de010
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/677de010

Branch: refs/heads/master
Commit: 677de010e39325ada1e9c150f87731da3b8698fb
Parents: c0798a9
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Apr 14 15:54:11 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sun Jun 18 20:24:31 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/messenger.cc    | 24 +++++++++++++++++++-----
 src/kudu/rpc/messenger.h     |  8 ++++++--
 src/kudu/rpc/reactor.cc      |  4 +++-
 src/kudu/rpc/rpc-test-base.h | 15 +++++++++++++--
 src/kudu/rpc/rpc-test.cc     | 24 ++++++++++++++++++++++++
 5 files changed, 65 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/677de010/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 28fea55..b5b8dfb 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -314,7 +314,8 @@ void Messenger::Shutdown() {
   // Need to shut down negotiation pool before the reactors, since the
   // reactors close the Connection sockets, and may race against the negotiation
   // threads' blocking reads & writes.
-  negotiation_pool_->Shutdown();
+  client_negotiation_pool_->Shutdown();
+  server_negotiation_pool_->Shutdown();
 
   for (Reactor* reactor : reactors_) {
     reactor->Shutdown();
@@ -416,10 +417,14 @@ Messenger::Messenger(const MessengerBuilder &bld)
   for (int i = 0; i < bld.num_reactors_; i++) {
     reactors_.push_back(new Reactor(retain_self_, i, bld));
   }
-  CHECK_OK(ThreadPoolBuilder("negotiator")
-              .set_min_threads(bld.min_negotiation_threads_)
-              .set_max_threads(bld.max_negotiation_threads_)
-              .Build(&negotiation_pool_));
+  CHECK_OK(ThreadPoolBuilder("client-negotiator")
+      .set_min_threads(bld.min_negotiation_threads_)
+      .set_max_threads(bld.max_negotiation_threads_)
+      .Build(&client_negotiation_pool_));
+  CHECK_OK(ThreadPoolBuilder("server-negotiator")
+      .set_min_threads(bld.min_negotiation_threads_)
+      .set_max_threads(bld.max_negotiation_threads_)
+      .Build(&server_negotiation_pool_));
 }
 
 Messenger::~Messenger() {
@@ -484,5 +489,14 @@ const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_nam
   }
 }
 
+ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) {
+  switch (dir) {
+    case Connection::CLIENT: return client_negotiation_pool_.get();
+    case Connection::SERVER: return server_negotiation_pool_.get();
+  }
+  DCHECK(false) << "Unknown Connection::Direction value: " << dir;
+  return nullptr;
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/677de010/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index c60ddaa..2ff5951 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -30,6 +30,7 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/util/locks.h"
@@ -227,7 +228,7 @@ class Messenger {
   RpcAuthentication authentication() const { return authentication_; }
   RpcEncryption encryption() const { return encryption_; }
 
-  ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
+  ThreadPool* negotiation_pool(Connection::Direction dir);
 
   RpczStore* rpcz_store() { return rpcz_store_.get(); }
 
@@ -287,7 +288,10 @@ class Messenger {
 
   std::vector<Reactor*> reactors_;
 
-  gscoped_ptr<ThreadPool> negotiation_pool_;
+  // Separate client and server negotiation pools to avoid possibility of distributed
+  // deadlock. See KUDU-2041.
+  gscoped_ptr<ThreadPool> client_negotiation_pool_;
+  gscoped_ptr<ThreadPool> server_negotiation_pool_;
 
   std::unique_ptr<security::TlsContext> tls_context_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/677de010/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index e235dd4..df4a661 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -446,7 +446,9 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
   TRACE("Submitting negotiation task for $0", conn->ToString());
   auto authentication = reactor()->messenger()->authentication();
   auto encryption = reactor()->messenger()->encryption();
-  RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
+  ThreadPool* negotiation_pool =
+      reactor()->messenger()->negotiation_pool(conn->direction());
+  RETURN_NOT_OK(negotiation_pool->SubmitClosure(
         Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/677de010/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index c40f546..c28218b 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -525,6 +525,11 @@ class RpcTestBase : public KuduTest {
     DoStartTestServer<CalculatorService>(server_addr, enable_ssl);
   }
 
+  void StartTestServerWithCustomMessenger(Sockaddr *server_addr,
+      const std::shared_ptr<Messenger>& messenger, bool enable_ssl = false) {
+    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, messenger);
+  }
+
   // Start a simple socket listening on a local port, returning the address.
   // This isn't an RPC server -- just a plain socket which can be helpful for testing.
   Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) {
@@ -548,8 +553,14 @@ class RpcTestBase : public KuduTest {
   }
 
   template<class ServiceClass>
-  void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false) {
-    server_messenger_ = CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl);
+  void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false,
+      const std::shared_ptr<Messenger>& messenger = nullptr) {
+    if (!messenger) {
+      server_messenger_ =
+          CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl);
+    } else {
+      server_messenger_ = messenger;
+    }
     std::shared_ptr<AcceptorPool> pool;
     ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
     ASSERT_OK(pool->Start(2));

http://git-wip-us.apache.org/repos/asf/kudu/blob/677de010/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 63b7b73..cf98a54 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -103,6 +103,30 @@ TEST_F(TestRpc, TestConnHeaderValidation) {
   ASSERT_OK(serialization::ValidateConnHeader(Slice(buf, conn_hdr_len)));
 }
 
+// Regression test for KUDU-2041
+TEST_P(TestRpc, TestNegotiationDeadlock) {
+  bool enable_ssl = GetParam();
+
+  // The deadlock would manifest in cases where the number of concurrent connection
+  // requests >= the number of threads. 1 thread and 1 cnxn to ourself is just the easiest
+  // way to reproduce the issue, because the server negotiation task must get queued after
+  // the client negotiation task if they share the same thread pool.
+  MessengerBuilder mb("TestRpc.TestNegotiationDeadlock");
+  mb.set_min_negotiation_threads(1)
+      .set_max_negotiation_threads(1)
+      .set_metric_entity(metric_entity_);
+  if (enable_ssl) mb.enable_inbound_tls();
+
+  shared_ptr<Messenger> messenger;
+  CHECK_OK(mb.Build(&messenger));
+
+  Sockaddr server_addr;
+  StartTestServerWithCustomMessenger(&server_addr, messenger, enable_ssl);
+
+  Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+  ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+}
+
 // Test making successful RPC calls.
 TEST_P(TestRpc, TestCall) {
   // Set up server.