You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/21 01:43:26 UTC
[3/5] impala git commit: [security] test and fixes for TLS socket
EINTR issues
[security] test and fixes for TLS socket EINTR issues
SSL_{read,write}() can return SSL_ERROR_WANT_{READ,WRITE}
correspondingly when signal interrupts recv()/send() calls even if
SSL_MODE_AUTO_RETRY is set in the TLS context. To handle that
properly in Socket::Blocking{Recv,Write}() methods, return
NetworkError() with appropriate POSIX error code from
TlsSocket::{Recv,Write}().
As a by-product, this changelist fixes flakiness in TestUniqueClientIds
scenario of the ClientStressTest test and other flaky tests which failed
with errors like below:
Bad status: IO error: Could not connect to the cluster: \
Client connection negotiation failed: client connection to \
IP:port: Read zero bytes on a blocking Recv() call: \
Transferred 0 of 4 bytes
Prior to this fix, the test failure ratio observed with dist-test
for TSAN builds was about 6% in multiple 1K runs. After the fix,
no failures observed.
Change-Id: Ibec9049186f79f1c43295e4735538ed7ba4e516e
Reviewed-on: http://gerrit.cloudera.org:8080/8462
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9360
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3a68e69e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3a68e69e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3a68e69e
Branch: refs/heads/master
Commit: 3a68e69ec7117e697bd8964fe32ae63c7b95b319
Parents: 97f950c
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Nov 3 12:39:25 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 20 22:32:02 2018 +0000
----------------------------------------------------------------------
be/src/kudu/security/CMakeLists.txt | 1 +
be/src/kudu/security/tls_handshake.cc | 1 -
be/src/kudu/security/tls_socket.cc | 9 ++
be/src/kudu/util/net/socket.cc | 27 +++-
security/tls_socket-test.cc | 196 +++++++++++++++++++++++++++++
5 files changed, 227 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 8736e66..d3539de 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -122,5 +122,6 @@ if (NOT NO_TESTS)
ADD_KUDU_TEST(crypto-test)
ADD_KUDU_TEST(test/mini_kdc-test)
ADD_KUDU_TEST(tls_handshake-test)
+ ADD_KUDU_TEST(tls_socket-test)
ADD_KUDU_TEST(token-test)
endif()
http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/security/tls_handshake.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_handshake.cc b/be/src/kudu/security/tls_handshake.cc
index fdf3bb1..e1314bc 100644
--- a/be/src/kudu/security/tls_handshake.cc
+++ b/be/src/kudu/security/tls_handshake.cc
@@ -117,7 +117,6 @@ Status TlsHandshake::Continue(const string& recv, string* send) {
DCHECK_GE(send->size(), 0);
return Status::OK();
}
- DCHECK_GT(send->size(), 0);
return Status::Incomplete("TLS Handshake incomplete");
}
http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index 7aeca31..f725a49 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -52,9 +52,14 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
errno = 0;
int32_t bytes_written = SSL_write(ssl_.get(), buf, amt);
+ int save_errno = errno;
if (bytes_written <= 0) {
auto error_code = SSL_get_error(ssl_.get(), bytes_written);
if (error_code == SSL_ERROR_WANT_WRITE) {
+ if (save_errno != 0) {
+ return Status::NetworkError("SSL_write error",
+ ErrnoToString(save_errno), save_errno);
+ }
// Socket not ready to write yet.
*nwritten = 0;
return Status::OK();
@@ -102,6 +107,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
}
auto error_code = SSL_get_error(ssl_.get(), bytes_read);
if (error_code == SSL_ERROR_WANT_READ) {
+ if (save_errno != 0) {
+ return Status::NetworkError("SSL_read error",
+ ErrnoToString(save_errno), save_errno);
+ }
// Nothing available to read yet.
*nread = 0;
return Status::OK();
http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
index 60f3d4b..1603da7 100644
--- a/be/src/kudu/util/net/socket.cc
+++ b/be/src/kudu/util/net/socket.cc
@@ -27,6 +27,7 @@
#include <limits>
#include <string>
+#include <type_traits>
#include <glog/logging.h>
@@ -55,6 +56,14 @@ DEFINE_bool_hidden(socket_inject_short_recvs, false,
TAG_FLAG(socket_inject_short_recvs, hidden);
TAG_FLAG(socket_inject_short_recvs, unsafe);
+// TODO(todd) consolidate with other copies of this!
+// Retry on EINTR for functions like read() that return -1 on error.
+#define RETRY_ON_EINTR(err, expr) do { \
+ static_assert(std::is_signed<decltype(err)>::value == true, \
+ #err " must be a signed integer"); \
+ (err) = (expr); \
+} while ((err) == -1 && errno == EINTR)
+
namespace kudu {
Socket::Socket()
@@ -339,20 +348,25 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
if (flags & FLAG_NONBLOCKING) {
accept_flags |= SOCK_NONBLOCK;
}
- new_conn->Reset(::accept4(fd_, (struct sockaddr*)&addr,
- &olen, accept_flags));
- if (new_conn->GetFd() < 0) {
+ int fd = -1;
+ RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
+ &olen, accept_flags));
+ if (fd < 0) {
int err = errno;
return Status::NetworkError(std::string("accept4(2) error: ") +
ErrnoToString(err), Slice(), err);
}
+ new_conn->Reset(fd);
+
#else
- new_conn->Reset(::accept(fd_, (struct sockaddr*)&addr, &olen));
- if (new_conn->GetFd() < 0) {
+ int fd = -1;
+ RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
+ if (fd < 0) {
int err = errno;
return Status::NetworkError(std::string("accept(2) error: ") +
ErrnoToString(err), Slice(), err);
}
+ new_conn->Reset(fd);
RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
RETURN_NOT_OK(new_conn->SetCloseOnExec());
#endif // defined(__linux__)
@@ -509,7 +523,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
}
DCHECK_GE(fd_, 0);
- int res = ::recv(fd_, buf, amt, 0);
+ int res;
+ RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
if (res <= 0) {
if (res == 0) {
return Status::NetworkError("Recv() got EOF from remote", Slice(), ESHUTDOWN);
http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/security/tls_socket-test.cc b/security/tls_socket-test.cc
new file mode 100644
index 0000000..a978e68
--- /dev/null
+++ b/security/tls_socket-test.cc
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <pthread.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+
+namespace kudu {
+namespace security {
+
+
+class TlsSocketTest : public KuduTest {
+ public:
+ void SetUp() override {
+ KuduTest::SetUp();
+
+ ASSERT_OK(client_tls_.Init());
+ ASSERT_OK(server_tls_.Init());
+ ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+ }
+
+ protected:
+ TlsContext client_tls_;
+ TlsContext server_tls_;
+};
+
+Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
+ tls->set_verification_mode(TlsVerificationMode::VERIFY_NONE);
+
+ bool done = false;
+ string received;
+ while (!done) {
+ string to_send;
+ Status s = tls->Continue(received, &to_send);
+ if (s.ok()) {
+ done = true;
+ } else if (!s.IsIncomplete()) {
+ RETURN_NOT_OK_PREPEND(s, "unexpected tls error");
+ }
+ if (!to_send.empty()) {
+ size_t nwritten;
+ auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(10);
+ RETURN_NOT_OK_PREPEND(sock->BlockingWrite(
+ reinterpret_cast<const uint8_t*>(to_send.data()),
+ to_send.size(), &nwritten, deadline),
+ "error sending");
+ }
+
+ if (!done) {
+ uint8_t buf[1024];
+ int32_t n = 0;
+ RETURN_NOT_OK_PREPEND(sock->Recv(buf, arraysize(buf), &n),
+ "error receiving");
+ received = string(reinterpret_cast<char*>(&buf[0]), n);
+ }
+ }
+ LOG(INFO) << side << ": negotiation complete";
+ return Status::OK();
+}
+
+void handler(int /* signal */) {}
+
+// Test for failures to handle EINTR during TLS connection
+// negotiation and data send/receive.
+TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+ Sockaddr listen_addr;
+ ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
+ Socket listener;
+ ASSERT_OK(listener.Init(0));
+ ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
+ ASSERT_OK(listener.GetSocketAddress(&listen_addr));
+
+ // Set up a no-op signal handler for SIGUSR2.
+ struct sigaction sa, sa_old;
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_handler = &handler;
+ sigaction(SIGUSR2, &sa, &sa_old);
+ SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+
+ // Size is big enough to not fit into output socket buffer of default size
+ // (controlled by setsockopt() with SO_SNDBUF).
+ constexpr size_t kSize = 32 * 1024 * 1024;
+
+ pthread_t server_tid;
+ CountDownLatch server_tid_sync(1);
+ std::atomic<bool> stop { false };
+ thread server([&] {
+ server_tid = pthread_self();
+ server_tid_sync.CountDown();
+ unique_ptr<Socket> sock(new Socket());
+ Sockaddr remote;
+ CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
+
+ TlsHandshake server;
+ CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+ CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+ CHECK_OK(server.Finish(&sock));
+
+ CHECK_OK(sock->SetRecvTimeout(kTimeout));
+ unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+ // An "echo" loop for kSize byte buffers.
+ while (!stop) {
+ size_t n;
+ Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
+ if (s.ok()) {
+ size_t written;
+ s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+ }
+ if (!s.ok()) {
+ CHECK(stop) << "unexpected error: " << s.ToString();
+ }
+ }
+ });
+ SCOPED_CLEANUP({ server.join(); });
+
+ // Start a thread to send signals to the server thread.
+ thread killer([&]() {
+ server_tid_sync.Wait();
+ while (!stop) {
+ PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
+ SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+ }
+ });
+ SCOPED_CLEANUP({ killer.join(); });
+
+ unique_ptr<Socket> client_sock(new Socket());
+ ASSERT_OK(client_sock->Init(0));
+ ASSERT_OK(client_sock->Connect(listen_addr));
+
+ TlsHandshake client;
+ ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+ ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+ ASSERT_OK(client.Finish(&client_sock));
+
+ unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+ for (int i = 0; i < 10; i++) {
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ size_t nwritten;
+ ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+ MonoTime::Now() + kTimeout));
+ size_t n;
+ ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+ MonoTime::Now() + kTimeout));
+ }
+ stop = true;
+ ASSERT_OK(client_sock->Close());
+
+ LOG(INFO) << "client done";
+}
+
+} // namespace security
+} // namespace kudu