You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/30 04:08:52 UTC
[4/4] incubator-impala git commit: KUDU-2041: Fix negotiation deadlock
KUDU-2041: Fix negotiation deadlock
With N threads in the negotiation threadpool, N or more concurrent
client negotiation attempts could starve any incoming server negotiation
tasks which used the same threadpool.
If the set of negotiation attempts forms a graph with a N cycles, the
negotiation could deadlock (at least until the negotiation timeout
expires) as all nodes in the system wait for a server request to
complete, but all nodes have dedicated all their resources to client
requests.
Fix: split the server and client tasks into two separate pools.
Testing: add a unit test which reproduces the issue, and passes with the
fix applied.
Change-Id: I38379eeaf7516d432708c2a2a285839f96c86d4f
Reviewed-on: http://gerrit.cloudera.org:8080/7177
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-on: http://gerrit.cloudera.org:8080/7742
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cc4816b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cc4816b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cc4816b3
Branch: refs/heads/master
Commit: cc4816b3d6ae2ca00ce61b1bc442161bfbe6de3f
Parents: d5670d6
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Apr 14 15:54:11 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 30 04:04:08 2017 +0000
----------------------------------------------------------------------
be/src/kudu/rpc/messenger.cc | 24 +++++++++++++++++++-----
be/src/kudu/rpc/messenger.h | 8 ++++++--
be/src/kudu/rpc/reactor.cc | 4 +++-
be/src/kudu/rpc/rpc-test-base.h | 15 +++++++++++++--
be/src/kudu/rpc/rpc-test.cc | 24 ++++++++++++++++++++++++
5 files changed, 65 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
index 1cec5bf..2884fba 100644
--- a/be/src/kudu/rpc/messenger.cc
+++ b/be/src/kudu/rpc/messenger.cc
@@ -333,7 +333,8 @@ void Messenger::Shutdown() {
// Need to shut down negotiation pool before the reactors, since the
// reactors close the Connection sockets, and may race against the negotiation
// threads' blocking reads & writes.
- negotiation_pool_->Shutdown();
+ client_negotiation_pool_->Shutdown();
+ server_negotiation_pool_->Shutdown();
for (Reactor* reactor : reactors_) {
reactor->Shutdown();
@@ -435,10 +436,14 @@ Messenger::Messenger(const MessengerBuilder &bld)
for (int i = 0; i < bld.num_reactors_; i++) {
reactors_.push_back(new Reactor(retain_self_, i, bld));
}
- CHECK_OK(ThreadPoolBuilder("negotiator")
- .set_min_threads(bld.min_negotiation_threads_)
- .set_max_threads(bld.max_negotiation_threads_)
- .Build(&negotiation_pool_));
+ CHECK_OK(ThreadPoolBuilder("client-negotiator")
+ .set_min_threads(bld.min_negotiation_threads_)
+ .set_max_threads(bld.max_negotiation_threads_)
+ .Build(&client_negotiation_pool_));
+ CHECK_OK(ThreadPoolBuilder("server-negotiator")
+ .set_min_threads(bld.min_negotiation_threads_)
+ .set_max_threads(bld.max_negotiation_threads_)
+ .Build(&server_negotiation_pool_));
}
Messenger::~Messenger() {
@@ -503,5 +508,14 @@ const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_nam
}
}
+ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) {
+ switch (dir) {
+ case Connection::CLIENT: return client_negotiation_pool_.get();
+ case Connection::SERVER: return server_negotiation_pool_.get();
+ }
+ DCHECK(false) << "Unknown Connection::Direction value: " << dir;
+ return nullptr;
+}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index 1ba76a7..9a8ebab 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -30,6 +30,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/security/token.pb.h"
#include "kudu/util/locks.h"
@@ -227,7 +228,7 @@ class Messenger {
RpcAuthentication authentication() const { return authentication_; }
RpcEncryption encryption() const { return encryption_; }
- ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
+ ThreadPool* negotiation_pool(Connection::Direction dir);
RpczStore* rpcz_store() { return rpcz_store_.get(); }
@@ -287,7 +288,10 @@ class Messenger {
std::vector<Reactor*> reactors_;
- gscoped_ptr<ThreadPool> negotiation_pool_;
+ // Separate client and server negotiation pools to avoid possibility of distributed
+ // deadlock. See KUDU-2041.
+ gscoped_ptr<ThreadPool> client_negotiation_pool_;
+ gscoped_ptr<ThreadPool> server_negotiation_pool_;
std::unique_ptr<security::TlsContext> tls_context_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index e235dd4..df4a661 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -446,7 +446,9 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
TRACE("Submitting negotiation task for $0", conn->ToString());
auto authentication = reactor()->messenger()->authentication();
auto encryption = reactor()->messenger()->encryption();
- RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
+ ThreadPool* negotiation_pool =
+ reactor()->messenger()->negotiation_pool(conn->direction());
+ RETURN_NOT_OK(negotiation_pool->SubmitClosure(
Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index c40f546..c28218b 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -525,6 +525,11 @@ class RpcTestBase : public KuduTest {
DoStartTestServer<CalculatorService>(server_addr, enable_ssl);
}
+ void StartTestServerWithCustomMessenger(Sockaddr *server_addr,
+ const std::shared_ptr<Messenger>& messenger, bool enable_ssl = false) {
+ DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, messenger);
+ }
+
// Start a simple socket listening on a local port, returning the address.
// This isn't an RPC server -- just a plain socket which can be helpful for testing.
Status StartFakeServer(Socket *listen_sock, Sockaddr *listen_addr) {
@@ -548,8 +553,14 @@ class RpcTestBase : public KuduTest {
}
template<class ServiceClass>
- void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false) {
- server_messenger_ = CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl);
+ void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false,
+ const std::shared_ptr<Messenger>& messenger = nullptr) {
+ if (!messenger) {
+ server_messenger_ =
+ CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl);
+ } else {
+ server_messenger_ = messenger;
+ }
std::shared_ptr<AcceptorPool> pool;
ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
ASSERT_OK(pool->Start(2));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cc4816b3/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 47ca528..2378892 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -108,6 +108,30 @@ TEST_F(TestRpc, TestConnHeaderValidation) {
ASSERT_OK(serialization::ValidateConnHeader(Slice(buf, conn_hdr_len)));
}
+// Regression test for KUDU-2041
+TEST_P(TestRpc, TestNegotiationDeadlock) {
+ bool enable_ssl = GetParam();
+
+ // The deadlock would manifest in cases where the number of concurrent connection
+ // requests >= the number of threads. 1 thread and 1 cnxn to ourself is just the easiest
+ // way to reproduce the issue, because the server negotiation task must get queued after
+ // the client negotiation task if they share the same thread pool.
+ MessengerBuilder mb("TestRpc.TestNegotiationDeadlock");
+ mb.set_min_negotiation_threads(1)
+ .set_max_negotiation_threads(1)
+ .set_metric_entity(metric_entity_);
+ if (enable_ssl) mb.enable_inbound_tls();
+
+ shared_ptr<Messenger> messenger;
+ CHECK_OK(mb.Build(&messenger));
+
+ Sockaddr server_addr;
+ StartTestServerWithCustomMessenger(&server_addr, messenger, enable_ssl);
+
+ Proxy p(messenger, server_addr, GenericCalculatorService::static_service_name());
+ ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
+}
+
// Test making successful RPC calls.
TEST_P(TestRpc, TestCall) {
// Set up server.