You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/03/13 22:36:57 UTC
impala git commit: KUDU-2334: Fix OutboundTransfer::TransferStarted()
to work with SSL_write()
Repository: impala
Updated Branches:
refs/heads/master 0d7787fe4 -> 8079cd9d2
KUDU-2334: Fix OutboundTransfer::TransferStarted() to work with SSL_write()
Previously, OutboundTransfer::TransferStarted() returns true iff
non-zero bytes have been successfully sent via Writev(). As it turns
out, this doesn't work well with SSL_write(). When SSL_write() returns -1
with errno EAGAIN or ETRYAGAIN, we need to retry the call with exactly
the same buffer pointer next time even if 0 bytes have been written.
The following sequence becomes problematic with the previous implementation
of OutboundTransfer::TransferStarted():
- WriteHandler() calls SendBuffer() on an OutboundTransfer.
- SendBuffer() calls TlsSocket::Writev() which hits the EAGAIN error above.
Since 0 bytes were written, cur_slice_idx_ and cur_offset_in_slice_ remain 0
and OutboundTransfer::TransferStarted() still returns false.
- OutboundTransfer is cancelled or timed out. car->call is set to NULL.
- WirteHandler() is called again and as it notices that the OutboundTransfer
hasn't really started yet and "car->call" is NULL due to cancellation, it
removes it from the outbound transfer queue and moves on to the next entry
in the queue.
- WriteHandler() calls SendBuffer() with the next entry in the queue and
eventually calls SSL_write() with a different buffer than expected by
SSL_write(), leading to "SSL3_WRITE_PENDING:bad write retry" error.
This change fixes the problem above by adding a boolean flag 'started_'
which is set to true if OutboundTransfer::SendBuffer() has been called
at least once. Also added some tests to exercise cancellation paths with
multiple concurrent RPCs.
Confirmed the problem above is fixed by running stress test in a 130 node
cluster with Impala. The problem happened consistently without the fix.
Change-Id: Id7ebdcbc1ef2a3e0c5e7162f03214c232755b683
Reviewed-on: http://gerrit.cloudera.org:8080/9587
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9606
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8079cd9d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8079cd9d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8079cd9d
Branch: refs/heads/master
Commit: 8079cd9d2a87051f81a41910b74fab15e35f36ea
Parents: 0d7787f
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Mar 12 12:27:34 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Mar 13 22:20:39 2018 +0000
----------------------------------------------------------------------
be/src/kudu/rpc/rpc-test-base.h | 4 ++
be/src/kudu/rpc/rpc-test.cc | 72 ++++++++++++++++++++++++++++++++++++
be/src/kudu/rpc/transfer.cc | 4 +-
be/src/kudu/rpc/transfer.h | 5 +++
4 files changed, 84 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index a30e8dc..332a7a1 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -50,6 +50,8 @@
#include "kudu/util/test_util.h"
#include "kudu/util/trace.h"
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
namespace kudu {
namespace rpc {
@@ -438,9 +440,11 @@ class RpcTestBase : public KuduTest {
MessengerBuilder bld(name);
if (enable_ssl) {
+ FLAGS_rpc_encrypt_loopback_connections = true;
bld.set_epki_cert_key_files(rpc_certificate_file, rpc_private_key_file);
bld.set_epki_certificate_authority_file(rpc_ca_certificate_file);
bld.set_epki_private_password_key_cmd(rpc_private_key_password_cmd);
+ bld.set_rpc_encryption("required");
bld.enable_inbound_tls();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index dcbe5a7..f6d930f 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -699,6 +699,8 @@ TEST_P(TestRpc, TestRpcSidecarLimits) {
ASSERT_STR_MATCHES(status.ToString(),
// Linux
"Connection reset by peer"
+ // Linux, SSL enabled
+ "|failed to read from TLS socket"
// macOS, while reading from socket.
"|got EOF from remote"
// macOS, while writing to socket.
@@ -1119,6 +1121,7 @@ static void SleepCallback(uint8_t* payload, CountDownLatch* latch) {
latch->CountDown();
}
+// Test to verify that sidecars aren't corrupted when cancelling an async RPC.
TEST_P(TestRpc, TestCancellationAsync) {
// Set up server.
Sockaddr server_addr;
@@ -1168,5 +1171,74 @@ TEST_P(TestRpc, TestCancellationAsync) {
client_messenger->Shutdown();
}
+// This function loops for 40 iterations and for each iteration, sends an async RPC
+// and sleeps for some time between 1 to 100 microseconds before cancelling the RPC.
+// This serves as a helper function for TestCancellationMultiThreads() to exercise
+// cancellation when there are concurrent RPCs.
+static void SendAndCancelRpcs(Proxy* p, const Slice& slice) {
+ RpcController controller;
+
+ // Used to generate sleep time between invoking RPC and requesting cancellation.
+ Random rand(SeedRandom());
+
+ for (int i = 0; i < 40; ++i) {
+ controller.Reset();
+ PushTwoStringsRequestPB request;
+ PushTwoStringsResponsePB resp;
+ int idx;
+ CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx));
+ request.set_sidecar1_idx(idx);
+ CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx));
+ request.set_sidecar2_idx(idx);
+
+ CountDownLatch latch(1);
+ p->AsyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+ request, &resp, &controller,
+ boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+
+ if ((i % 8) != 0) {
+ // Sleep for a while before cancelling the RPC.
+ SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(100)));
+ controller.Cancel();
+ }
+ latch.Wait();
+ CHECK(controller.status().IsAborted() || controller.status().IsServiceUnavailable() ||
+ controller.status().ok()) << controller.status().ToString();
+ }
+}
+
+// Test to exercise cancellation when there are multiple concurrent RPCs from the
+// same client to the same server.
+TEST_P(TestRpc, TestCancellationMultiThreads) {
+ // Set up server.
+ Sockaddr server_addr;
+ bool enable_ssl = GetParam();
+ ASSERT_OK(StartTestServer(&server_addr, enable_ssl));
+
+ // Set up client.
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ shared_ptr<Messenger> client_messenger;
+ ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, enable_ssl));
+ Proxy p(client_messenger, server_addr, server_addr.host(),
+ GenericCalculatorService::static_service_name());
+
+ // Buffer used for sidecars by SendAndCancelRpcs().
+ string buf(16 * 1024 * 1024, 'a');
+ Slice slice(buf);
+
+ // Start a bunch of threads which invoke async RPC and cancellation.
+ std::vector<scoped_refptr<Thread>> threads;
+ for (int i = 0; i < 30; ++i) {
+ scoped_refptr<Thread> rpc_thread;
+ ASSERT_OK(Thread::Create("test", "rpc", SendAndCancelRpcs, &p, slice, &rpc_thread));
+ threads.push_back(rpc_thread);
+ }
+ // Wait for all threads to complete.
+ for (scoped_refptr<Thread>& rpc_thread : threads) {
+ rpc_thread->Join();
+ }
+ client_messenger->Shutdown();
+}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
index f83c432..4203b03 100644
--- a/be/src/kudu/rpc/transfer.cc
+++ b/be/src/kudu/rpc/transfer.cc
@@ -159,6 +159,7 @@ OutboundTransfer::OutboundTransfer(int32_t call_id,
cur_offset_in_slice_(0),
callbacks_(callbacks),
call_id_(call_id),
+ started_(false),
aborted_(false) {
n_payload_slices_ = n_payload_slices;
@@ -185,6 +186,7 @@ void OutboundTransfer::Abort(const Status &status) {
Status OutboundTransfer::SendBuffer(Socket &socket) {
CHECK_LT(cur_slice_idx_, n_payload_slices_);
+ started_ = true;
int n_iovecs = n_payload_slices_ - cur_slice_idx_;
struct iovec iovec[n_iovecs];
{
@@ -232,7 +234,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) {
}
bool OutboundTransfer::TransferStarted() const {
- return cur_offset_in_slice_ != 0 || cur_slice_idx_ != 0;
+ return started_;
}
bool OutboundTransfer::TransferFinished() const {
http://git-wip-us.apache.org/repos/asf/impala/blob/8079cd9d/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
index 533fe83..2c81658 100644
--- a/be/src/kudu/rpc/transfer.h
+++ b/be/src/kudu/rpc/transfer.h
@@ -187,6 +187,11 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
// In the case of call responses, kInvalidCallId
int32_t call_id_;
+ // True if SendBuffer() has been called at least once. This can be true even if
+ // no bytes were sent successfully. This is needed as SSL_write() is stateful.
+ // Please see KUDU-2334 for details.
+ bool started_;
+
bool aborted_;
DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);