You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/21 22:23:03 UTC

[2/4] impala git commit: KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

This fixes a bug which caused RaftConsensusITest.TestLargeBatches to
fail when run under stress, as in the following command line:

taskset -c 0-4 \
 build/latest/bin/raft_consensus-itest \
   --gtest_filter=\*LargeBat\* \
   --stress-cpu-threads=8

This would produce an error like:
Network error: failed to write to TLS socket: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry:s3_pkt.c:878

This means that we were retrying a write after getting EAGAIN, but with
a different buffer than the first time.

I tracked this down to mishandling of temporary socket errors in
TlsSocket::Writev(). In the case that we successfully write part of the
io vector but hit such an error trying to write a later element in the
vector, we were still propagating the error back up to the caller. The
caller didn't realize that part of the write was successful, and thus it
would retry the write from the beginning.

The fix is to fix the above, but also to enable partial writes in
TlsContext. The new test fails if either of the above two changes are
backed out.

Change-Id: If797f220f42bfb2e6f452b66f15e7a758e883472
Reviewed-on: http://gerrit.cloudera.org:8080/8570
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9361
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/678bf28e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/678bf28e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/678bf28e

Branch: refs/heads/master
Commit: 678bf28e233e667b05585110422762614840bdc2
Parents: baec8ca
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 15 22:55:44 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 21 04:33:55 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/security/tls_context.cc |   2 +-
 be/src/kudu/security/tls_socket.cc  |  10 +-
 security/tls_socket-test.cc         | 277 +++++++++++++++++++++++--------
 3 files changed, 219 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/678bf28e/be/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.cc b/be/src/kudu/security/tls_context.cc
index b6ec57f..f94e3d2 100644
--- a/be/src/kudu/security/tls_context.cc
+++ b/be/src/kudu/security/tls_context.cc
@@ -96,7 +96,7 @@ Status TlsContext::Init() {
   if (!ctx_) {
     return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
   }
-  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY);
+  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE);
 
   // Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
   // We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not

http://git-wip-us.apache.org/repos/asf/impala/blob/678bf28e/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
index f725a49..e68cdce 100644
--- a/be/src/kudu/security/tls_socket.cc
+++ b/be/src/kudu/security/tls_socket.cc
@@ -25,6 +25,7 @@
 #include "kudu/security/cert.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/net/socket.h"
 
 namespace kudu {
 namespace security {
@@ -42,11 +43,11 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
   CHECK(ssl_);
   SCOPED_OPENSSL_NO_PENDING_ERRORS;
 
+  *nwritten = 0;
   if (PREDICT_FALSE(amt == 0)) {
     // Writing an empty buffer is a no-op. This happens occasionally, eg in the
     // case where the response has an empty sidecar. We have to special case
     // it, because SSL_write can return '0' to indicate certain types of errors.
-    *nwritten = 0;
     return Status::OK();
   }
 
@@ -61,7 +62,6 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
                                     ErrnoToString(save_errno), save_errno);
       }
       // Socket not ready to write yet.
-      *nwritten = 0;
       return Status::OK();
     }
     return Status::NetworkError("failed to write to TLS socket",
@@ -90,6 +90,12 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritt
   }
   RETURN_NOT_OK(SetTcpCork(0));
   *nwritten = total_written;
+  // If we did manage to write something, but not everything, due to a temporary socket
+  // error, then we should still return an OK status indicating a successful _partial_
+  // write.
+  if (total_written > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
+    return Status::OK();
+  }
   return write_status;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/678bf28e/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/security/tls_socket-test.cc b/security/tls_socket-test.cc
index a978e68..214d2bf 100644
--- a/security/tls_socket-test.cc
+++ b/security/tls_socket-test.cc
@@ -17,7 +17,11 @@
 
 #include "kudu/security/tls_handshake.h"
 
+#include <algorithm>
 #include <pthread.h>
+#include <sched.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
 
 #include <atomic>
 #include <csignal>
@@ -28,6 +32,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -38,6 +43,8 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -46,25 +53,27 @@
 using std::string;
 using std::thread;
 using std::unique_ptr;
-
+using std::vector;
 
 namespace kudu {
 namespace security {
 
+const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+
+// Size is big enough to not fit into output socket buffer of default size
+// (controlled by setsockopt() with SO_SNDBUF).
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
 
 class TlsSocketTest : public KuduTest {
  public:
   void SetUp() override {
     KuduTest::SetUp();
-
     ASSERT_OK(client_tls_.Init());
-    ASSERT_OK(server_tls_.Init());
-    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
   }
 
  protected:
+  void ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock);
   TlsContext client_tls_;
-  TlsContext server_tls_;
 };
 
 Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
@@ -101,19 +110,112 @@ Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
   return Status::OK();
 }
 
+void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock) {
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+  *sock = std::move(client_sock);
+}
+
+class EchoServer {
+ public:
+  EchoServer()
+      : pthread_sync_(1) {
+  }
+  ~EchoServer() {
+    Stop();
+    Join();
+  }
+
+  void Start() {
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+    ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
+    ASSERT_OK(listener_.Init(0));
+    ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
+    ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
+
+    thread_ = thread([&] {
+        pthread_ = pthread_self();
+        pthread_sync_.CountDown();
+        unique_ptr<Socket> sock(new Socket());
+        Sockaddr remote;
+        CHECK_OK(listener_.Accept(sock.get(), &remote, /*flags=*/0));
+
+        TlsHandshake server;
+        CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+        CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+        CHECK_OK(server.Finish(&sock));
+
+        CHECK_OK(sock->SetRecvTimeout(kTimeout));
+        unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+        // An "echo" loop for kEchoChunkSize byte buffers.
+        while (!stop_) {
+          size_t n;
+          Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+          }
+
+          LOG(INFO) << "server echoing " << n << " bytes";
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+          }
+          if (slow_read_) {
+            SleepFor(MonoDelta::FromMilliseconds(10));
+          }
+        }
+      });
+  }
+
+  void EnableSlowRead() {
+    slow_read_ = true;
+  }
+
+  const Sockaddr& listen_addr() const {
+    return listen_addr_;
+  }
+
+  bool stopped() const {
+    return stop_;
+  }
+
+  void Stop() {
+    stop_ = true;
+  }
+  void Join() {
+    thread_.join();
+  }
+
+  const pthread_t& pthread() {
+    pthread_sync_.Wait();
+    return pthread_;
+  }
+
+ private:
+  TlsContext server_tls_;
+  Socket listener_;
+  Sockaddr listen_addr_;
+  thread thread_;
+  pthread_t pthread_;
+  CountDownLatch pthread_sync_;
+  std::atomic<bool> stop_ { false };
+
+  bool slow_read_ = false;
+};
+
 void handler(int /* signal */) {}
 
 // Test for failures to handle EINTR during TLS connection
 // negotiation and data send/receive.
 TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
-  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
-  Sockaddr listen_addr;
-  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
-  Socket listener;
-  ASSERT_OK(listener.Init(0));
-  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
-  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
-
   // Set up a no-op signal handler for SIGUSR2.
   struct sigaction sa, sa_old;
   memset(&sa, 0, sizeof(sa));
@@ -121,76 +223,117 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
   sigaction(SIGUSR2, &sa, &sa_old);
   SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
 
-  // Size is big enough to not fit into output socket buffer of default size
-  // (controlled by setsockopt() with SO_SNDBUF).
-  constexpr size_t kSize = 32 * 1024 * 1024;
-
-  pthread_t server_tid;
-  CountDownLatch server_tid_sync(1);
-  std::atomic<bool> stop { false };
-  thread server([&] {
-      server_tid = pthread_self();
-      server_tid_sync.CountDown();
-      unique_ptr<Socket> sock(new Socket());
-      Sockaddr remote;
-      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
-
-      TlsHandshake server;
-      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
-      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
-      CHECK_OK(server.Finish(&sock));
-
-      CHECK_OK(sock->SetRecvTimeout(kTimeout));
-      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
-      // An "echo" loop for kSize byte buffers.
-      while (!stop) {
-        size_t n;
-        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
-        if (s.ok()) {
-          size_t written;
-          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
-        }
-        if (!s.ok()) {
-          CHECK(stop) << "unexpected error: " << s.ToString();
-        }
-      }
-    });
-  SCOPED_CLEANUP({ server.join(); });
+  EchoServer server;
+  NO_FATALS(server.Start());
 
   // Start a thread to send signals to the server thread.
   thread killer([&]() {
-    server_tid_sync.Wait();
-    while (!stop) {
-      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
-      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
-    }
-  });
+      while (!server.stopped()) {
+        PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
+        SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+      }
+    });
   SCOPED_CLEANUP({ killer.join(); });
 
-  unique_ptr<Socket> client_sock(new Socket());
-  ASSERT_OK(client_sock->Init(0));
-  ASSERT_OK(client_sock->Connect(listen_addr));
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
 
-  TlsHandshake client;
-  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
-  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
-  ASSERT_OK(client.Finish(&client_sock));
-
-  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
   for (int i = 0; i < 10; i++) {
     SleepFor(MonoDelta::FromMilliseconds(1));
     size_t nwritten;
-    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
         MonoTime::Now() + kTimeout));
     size_t n;
-    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &n,
         MonoTime::Now() + kTimeout));
   }
-  stop = true;
+  server.Stop();
   ASSERT_OK(client_sock->Close());
-
   LOG(INFO) << "client done";
 }
 
+// Return an iovec containing the same data as the buffer 'buf' with the length 'len',
+// but split into random-sized chunks. The chunks are sized randomly between 1 and
+// 'max_chunk_size' bytes.
+vector<struct iovec> ChunkIOVec(Random* rng, uint8_t* buf, int len, int max_chunk_size) {
+  vector<struct iovec> ret;
+  uint8_t* p = buf;
+  int rem = len;
+  while (rem > 0) {
+    int len = rng->Uniform(max_chunk_size) + 1;
+    len = std::min(len, rem);
+    ret.push_back({p, static_cast<size_t>(len)});
+    p += len;
+    rem -= len;
+  }
+  return ret;
+}
+
+// Regression test for KUDU-2218, a bug in which Writev would improperly handle
+// partial writes in non-blocking mode.
+TEST_F(TlsSocketTest, TestNonBlockingWritev) {
+  Random rng(GetRandomSeed32());
+
+  EchoServer server;
+  server.EnableSlowRead();
+  NO_FATALS(server.Start());
+
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+
+  int sndbuf = 16 * 1024;
+  CHECK_ERR(setsockopt(client_sock->GetFd(), SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+  unique_ptr<uint8_t[]> rbuf(new uint8_t[kEchoChunkSize]);
+  RandomString(buf.get(), kEchoChunkSize, &rng);
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(client_sock->SetNonBlocking(true));
+
+    // Prepare an IOV with the input data split into a bunch of randomly-sized
+    // chunks.
+    vector<struct iovec> iov = ChunkIOVec(&rng, buf.get(), kEchoChunkSize, 1024 * 1024);
+
+    // Loop calling writev until the iov is exhausted
+    int rem = kEchoChunkSize;
+    while (rem > 0) {
+      CHECK(!iov.empty()) << rem;
+      int32_t n;
+      Status s = client_sock->Writev(&iov[0], iov.size(), &n);
+      if (Socket::IsTemporarySocketError(s.posix_code())) {
+        sched_yield();
+        continue;
+      }
+      ASSERT_OK(s);
+      rem -= n;
+      ASSERT_GE(n, 0);
+      while (n > 0) {
+        if (n < iov[0].iov_len) {
+          iov[0].iov_len -= n;
+          iov[0].iov_base = reinterpret_cast<uint8_t*>(iov[0].iov_base) + n;
+          n = 0;
+        } else {
+          n -= iov[0].iov_len;
+          iov.erase(iov.begin());
+        }
+      }
+    }
+    LOG(INFO) << "client waiting";
+
+    size_t n;
+    ASSERT_OK(client_sock->SetNonBlocking(false));
+    ASSERT_OK(client_sock->BlockingRecv(rbuf.get(), kEchoChunkSize, &n,
+        MonoTime::Now() + kTimeout));
+    LOG(INFO) << "client got response";
+
+    ASSERT_EQ(0, memcmp(buf.get(), rbuf.get(), kEchoChunkSize));
+  }
+
+  server.Stop();
+  ASSERT_OK(client_sock->Close());
+}
+
 } // namespace security
 } // namespace kudu