You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/21 01:43:26 UTC

[3/5] impala git commit: [security] test and fixes for TLS socket EINTR issues

[security] test and fixes for TLS socket EINTR issues

SSL_{read,write}() can return SSL_ERROR_WANT_{READ,WRITE}
correspondingly when signal interrupts recv()/send() calls even if
SSL_MODE_AUTO_RETRY is set in the TLS context.  To handle that
properly in Socket::Blocking{Recv,Write}() methods, return
NetworkError() with appropriate POSIX error code from
TlsSocket::{Recv,Write}().

As a by-product, this changelist fixes flakiness in TestUniqueClientIds
scenario of the ClientStressTest test and other flaky tests which failed
with errors like below:

  Bad status: IO error: Could not connect to the cluster: \
    Client connection negotiation failed: client connection to \
    IP:port: Read zero bytes on a blocking Recv() call: \
    Transferred 0 of 4 bytes

Prior to this fix, the test failure ratio observed with dist-test
for TSAN builds was about 6% in multiple 1K runs.  After the fix,
no failures observed.

Change-Id: Ibec9049186f79f1c43295e4735538ed7ba4e516e
Reviewed-on: http://gerrit.cloudera.org:8080/8462
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9360
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3a68e69e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3a68e69e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3a68e69e

Branch: refs/heads/master
Commit: 3a68e69ec7117e697bd8964fe32ae63c7b95b319
Parents: 97f950c
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Nov 3 12:39:25 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 20 22:32:02 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/CMakeLists.txt   |   1 +
 be/src/kudu/security/tls_handshake.cc |   1 -
 be/src/kudu/security/tls_socket.cc    |   9 ++
 be/src/kudu/util/net/socket.cc        |  27 +++-
 security/tls_socket-test.cc           | 196 +++++++++++++++++++++++++++++
 5 files changed, 227 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 8736e66..d3539de 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -122,5 +122,6 @@ if (NOT NO_TESTS)
   ADD_KUDU_TEST(crypto-test)
   ADD_KUDU_TEST(test/mini_kdc-test)
   ADD_KUDU_TEST(tls_handshake-test)
+  ADD_KUDU_TEST(tls_socket-test)
   ADD_KUDU_TEST(token-test)
 endif()

http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/security/tls_handshake.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_handshake.cc b/be/src/kudu/security/tls_handshake.cc
index fdf3bb1..e1314bc 100644
--- a/be/src/kudu/security/tls_handshake.cc
+++ b/be/src/kudu/security/tls_handshake.cc
@@ -117,7 +117,6 @@ Status TlsHandshake::Continue(const string& recv, string* send) {
     DCHECK_GE(send->size(), 0);
     return Status::OK();
   }
-  DCHECK_GT(send->size(), 0);
   return Status::Incomplete("TLS Handshake incomplete");
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index 7aeca31..f725a49 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -52,9 +52,14 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
 
   errno = 0;
   int32_t bytes_written = SSL_write(ssl_.get(), buf, amt);
+  int save_errno = errno;
   if (bytes_written <= 0) {
     auto error_code = SSL_get_error(ssl_.get(), bytes_written);
     if (error_code == SSL_ERROR_WANT_WRITE) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_write error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
       // Socket not ready to write yet.
       *nwritten = 0;
       return Status::OK();
@@ -102,6 +107,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
     }
     auto error_code = SSL_get_error(ssl_.get(), bytes_read);
     if (error_code == SSL_ERROR_WANT_READ) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_read error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
       // Nothing available to read yet.
       *nread = 0;
       return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/be/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/net/socket.cc b/be/src/kudu/util/net/socket.cc
index 60f3d4b..1603da7 100644
--- a/be/src/kudu/util/net/socket.cc
+++ b/be/src/kudu/util/net/socket.cc
@@ -27,6 +27,7 @@
 
 #include <limits>
 #include <string>
+#include <type_traits>
 
 #include <glog/logging.h>
 
@@ -55,6 +56,14 @@ DEFINE_bool_hidden(socket_inject_short_recvs, false,
 TAG_FLAG(socket_inject_short_recvs, hidden);
 TAG_FLAG(socket_inject_short_recvs, unsafe);
 
+// TODO(todd) consolidate with other copies of this!
+// Retry on EINTR for functions like read() that return -1 on error.
+#define RETRY_ON_EINTR(err, expr) do { \
+  static_assert(std::is_signed<decltype(err)>::value == true, \
+                #err " must be a signed integer"); \
+  (err) = (expr); \
+} while ((err) == -1 && errno == EINTR)
+
 namespace kudu {
 
 Socket::Socket()
@@ -339,20 +348,25 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
   if (flags & FLAG_NONBLOCKING) {
     accept_flags |= SOCK_NONBLOCK;
   }
-  new_conn->Reset(::accept4(fd_, (struct sockaddr*)&addr,
-                  &olen, accept_flags));
-  if (new_conn->GetFd() < 0) {
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
+                             &olen, accept_flags));
+  if (fd < 0) {
     int err = errno;
     return Status::NetworkError(std::string("accept4(2) error: ") +
                                 ErrnoToString(err), Slice(), err);
   }
+  new_conn->Reset(fd);
+
 #else
-  new_conn->Reset(::accept(fd_, (struct sockaddr*)&addr, &olen));
-  if (new_conn->GetFd() < 0) {
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
+  if (fd < 0) {
     int err = errno;
     return Status::NetworkError(std::string("accept(2) error: ") +
                                 ErrnoToString(err), Slice(), err);
   }
+  new_conn->Reset(fd);
   RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
   RETURN_NOT_OK(new_conn->SetCloseOnExec());
 #endif // defined(__linux__)
@@ -509,7 +523,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
   }
 
   DCHECK_GE(fd_, 0);
-  int res = ::recv(fd_, buf, amt, 0);
+  int res;
+  RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
   if (res <= 0) {
     if (res == 0) {
       return Status::NetworkError("Recv() got EOF from remote", Slice(), ESHUTDOWN);

http://git-wip-us.apache.org/repos/asf/impala/blob/3a68e69e/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/security/tls_socket-test.cc b/security/tls_socket-test.cc
new file mode 100644
index 0000000..a978e68
--- /dev/null
+++ b/security/tls_socket-test.cc
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <pthread.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+
+namespace kudu {
+namespace security {
+
+
+class TlsSocketTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    ASSERT_OK(client_tls_.Init());
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+  }
+
+ protected:
+  TlsContext client_tls_;
+  TlsContext server_tls_;
+};
+
+Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
+  tls->set_verification_mode(TlsVerificationMode::VERIFY_NONE);
+
+  bool done = false;
+  string received;
+  while (!done) {
+    string to_send;
+    Status s = tls->Continue(received, &to_send);
+    if (s.ok()) {
+      done = true;
+    } else if (!s.IsIncomplete()) {
+      RETURN_NOT_OK_PREPEND(s, "unexpected tls error");
+    }
+    if (!to_send.empty()) {
+      size_t nwritten;
+      auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(10);
+      RETURN_NOT_OK_PREPEND(sock->BlockingWrite(
+          reinterpret_cast<const uint8_t*>(to_send.data()),
+          to_send.size(), &nwritten, deadline),
+                            "error sending");
+    }
+
+    if (!done) {
+      uint8_t buf[1024];
+      int32_t n = 0;
+      RETURN_NOT_OK_PREPEND(sock->Recv(buf, arraysize(buf), &n),
+                            "error receiving");
+      received = string(reinterpret_cast<char*>(&buf[0]), n);
+    }
+  }
+  LOG(INFO) << side << ": negotiation complete";
+  return Status::OK();
+}
+
+void handler(int /* signal */) {}
+
+// Test for failures to handle EINTR during TLS connection
+// negotiation and data send/receive.
+TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  Sockaddr listen_addr;
+  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
+  Socket listener;
+  ASSERT_OK(listener.Init(0));
+  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
+  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
+
+  // Set up a no-op signal handler for SIGUSR2.
+  struct sigaction sa, sa_old;
+  memset(&sa, 0, sizeof(sa));
+  sa.sa_handler = &handler;
+  sigaction(SIGUSR2, &sa, &sa_old);
+  SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+
+  // Size is big enough to not fit into output socket buffer of default size
+  // (controlled by setsockopt() with SO_SNDBUF).
+  constexpr size_t kSize = 32 * 1024 * 1024;
+
+  pthread_t server_tid;
+  CountDownLatch server_tid_sync(1);
+  std::atomic<bool> stop { false };
+  thread server([&] {
+      server_tid = pthread_self();
+      server_tid_sync.CountDown();
+      unique_ptr<Socket> sock(new Socket());
+      Sockaddr remote;
+      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
+
+      TlsHandshake server;
+      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+      CHECK_OK(server.Finish(&sock));
+
+      CHECK_OK(sock->SetRecvTimeout(kTimeout));
+      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+      // An "echo" loop for kSize byte buffers.
+      while (!stop) {
+        size_t n;
+        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
+        if (s.ok()) {
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+        }
+        if (!s.ok()) {
+          CHECK(stop) << "unexpected error: " << s.ToString();
+        }
+      }
+    });
+  SCOPED_CLEANUP({ server.join(); });
+
+  // Start a thread to send signals to the server thread.
+  thread killer([&]() {
+    server_tid_sync.Wait();
+    while (!stop) {
+      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
+      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+    }
+  });
+  SCOPED_CLEANUP({ killer.join(); });
+
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(listen_addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  for (int i = 0; i < 10; i++) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    size_t nwritten;
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+        MonoTime::Now() + kTimeout));
+    size_t n;
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+        MonoTime::Now() + kTimeout));
+  }
+  stop = true;
+  ASSERT_OK(client_sock->Close());
+
+  LOG(INFO) << "client done";
+}
+
+} // namespace security
+} // namespace kudu