You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/11/22 06:09:51 UTC

[1/2] kudu git commit: [security] test and fixes for TLS socket EINTR issues

Repository: kudu
Updated Branches:
  refs/heads/branch-1.5.x 5d8620fc8 -> d4a47487b


[security] test and fixes for TLS socket EINTR issues

SSL_{read,write}() can return SSL_ERROR_WANT_{READ,WRITE}
correspondingly when signal interrupts recv()/send() calls even if
SSL_MODE_AUTO_RETRY is set in the TLS context.  To handle that
properly in Socket::Blocking{Recv,Write}() methods, return
NetworkError() with appropriate POSIX error code from
TlsSocket::{Recv,Write}().

As a by-product, this changelist fixes flakiness in TestUniqueClientIds
scenario of the ClientStressTest test and other flaky tests which failed
with errors like below:

  Bad status: IO error: Could not connect to the cluster: \
    Client connection negotiation failed: client connection to \
    IP:port: Read zero bytes on a blocking Recv() call: \
    Transferred 0 of 4 bytes

Prior to this fix, the test failure ratio observed with dist-test
for TSAN builds was about 6% in multiple 1K runs.  After the fix,
no failures observed.

While merging from the main trunk, the SCOPED_CLEANUP macro was
replaced with MakeScopedCleanup.

Change-Id: Ibec9049186f79f1c43295e4735538ed7ba4e516e
Reviewed-on: http://gerrit.cloudera.org:8080/8462
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
(cherry picked from commit 18e024cf8bcaea192efb63780802cc4c799bbb9c)
Reviewed-on: http://gerrit.cloudera.org:8080/8603
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/28fcab27
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/28fcab27
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/28fcab27

Branch: refs/heads/branch-1.5.x
Commit: 28fcab271db51acaf39f699d5f9f0d044edbf345
Parents: 5d8620f
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Nov 3 12:39:25 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Nov 22 06:08:22 2017 +0000

----------------------------------------------------------------------
 src/kudu/security/CMakeLists.txt     |   1 +
 src/kudu/security/tls_handshake.cc   |   1 -
 src/kudu/security/tls_socket-test.cc | 196 ++++++++++++++++++++++++++++++
 src/kudu/security/tls_socket.cc      |   9 ++
 src/kudu/util/net/socket.cc          |  27 +++-
 5 files changed, 227 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/28fcab27/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/security/CMakeLists.txt b/src/kudu/security/CMakeLists.txt
index 0dc7d0f..ea99a02 100644
--- a/src/kudu/security/CMakeLists.txt
+++ b/src/kudu/security/CMakeLists.txt
@@ -113,5 +113,6 @@ if (NOT NO_TESTS)
   ADD_KUDU_TEST(crypto-test)
   ADD_KUDU_TEST(test/mini_kdc-test)
   ADD_KUDU_TEST(tls_handshake-test)
+  ADD_KUDU_TEST(tls_socket-test)
   ADD_KUDU_TEST(token-test)
 endif()

http://git-wip-us.apache.org/repos/asf/kudu/blob/28fcab27/src/kudu/security/tls_handshake.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_handshake.cc b/src/kudu/security/tls_handshake.cc
index 57f9c7c..6daea6b 100644
--- a/src/kudu/security/tls_handshake.cc
+++ b/src/kudu/security/tls_handshake.cc
@@ -116,7 +116,6 @@ Status TlsHandshake::Continue(const string& recv, string* send) {
     DCHECK_GE(send->size(), 0);
     return Status::OK();
   }
-  DCHECK_GT(send->size(), 0);
   return Status::Incomplete("TLS Handshake incomplete");
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/28fcab27/src/kudu/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_socket-test.cc b/src/kudu/security/tls_socket-test.cc
new file mode 100644
index 0000000..f2b4ffd
--- /dev/null
+++ b/src/kudu/security/tls_socket-test.cc
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <pthread.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+
+namespace kudu {
+namespace security {
+
+
+class TlsSocketTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    ASSERT_OK(client_tls_.Init());
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+  }
+
+ protected:
+  TlsContext client_tls_;
+  TlsContext server_tls_;
+};
+
+Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
+  tls->set_verification_mode(TlsVerificationMode::VERIFY_NONE);
+
+  bool done = false;
+  string received;
+  while (!done) {
+    string to_send;
+    Status s = tls->Continue(received, &to_send);
+    if (s.ok()) {
+      done = true;
+    } else if (!s.IsIncomplete()) {
+      RETURN_NOT_OK_PREPEND(s, "unexpected tls error");
+    }
+    if (!to_send.empty()) {
+      size_t nwritten;
+      auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(10);
+      RETURN_NOT_OK_PREPEND(sock->BlockingWrite(
+          reinterpret_cast<const uint8_t*>(to_send.data()),
+          to_send.size(), &nwritten, deadline),
+                            "error sending");
+    }
+
+    if (!done) {
+      uint8_t buf[1024];
+      int32_t n = 0;
+      RETURN_NOT_OK_PREPEND(sock->Recv(buf, arraysize(buf), &n),
+                            "error receiving");
+      received = string(reinterpret_cast<char*>(&buf[0]), n);
+    }
+  }
+  LOG(INFO) << side << ": negotiation complete";
+  return Status::OK();
+}
+
+void handler(int /* signal */) {}
+
+// Test for failures to handle EINTR during TLS connection
+// negotiation and data send/receive.
+TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  Sockaddr listen_addr;
+  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
+  Socket listener;
+  ASSERT_OK(listener.Init(0));
+  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
+  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
+
+  // Set up a no-op signal handler for SIGUSR2.
+  struct sigaction sa, sa_old;
+  memset(&sa, 0, sizeof(sa));
+  sa.sa_handler = &handler;
+  sigaction(SIGUSR2, &sa, &sa_old);
+  auto sig_cleanup = MakeScopedCleanup([&]() { sigaction(SIGUSR2, &sa_old, nullptr); });
+
+  // Size is big enough to not fit into output socket buffer of default size
+  // (controlled by setsockopt() with SO_SNDBUF).
+  constexpr size_t kSize = 32 * 1024 * 1024;
+
+  pthread_t server_tid;
+  CountDownLatch server_tid_sync(1);
+  std::atomic<bool> stop { false };
+  thread server([&] {
+      server_tid = pthread_self();
+      server_tid_sync.CountDown();
+      unique_ptr<Socket> sock(new Socket());
+      Sockaddr remote;
+      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
+
+      TlsHandshake server;
+      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+      CHECK_OK(server.Finish(&sock));
+
+      CHECK_OK(sock->SetRecvTimeout(kTimeout));
+      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+      // An "echo" loop for kSize byte buffers.
+      while (!stop) {
+        size_t n;
+        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
+        if (s.ok()) {
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+        }
+        if (!s.ok()) {
+          CHECK(stop) << "unexpected error: " << s.ToString();
+        }
+      }
+    });
+  auto server_cleanup = MakeScopedCleanup([&]() { server.join(); });
+
+  // Start a thread to send signals to the server thread.
+  thread killer([&]() {
+    server_tid_sync.Wait();
+    while (!stop) {
+      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
+      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+    }
+  });
+  auto killer_cleanup = MakeScopedCleanup([&]() { killer.join(); });
+
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(listen_addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  for (int i = 0; i < 10; i++) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    size_t nwritten;
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+        MonoTime::Now() + kTimeout));
+    size_t n;
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+        MonoTime::Now() + kTimeout));
+  }
+  stop = true;
+  ASSERT_OK(client_sock->Close());
+
+  LOG(INFO) << "client done";
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/28fcab27/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_socket.cc b/src/kudu/security/tls_socket.cc
index 58addfc..273ebe6 100644
--- a/src/kudu/security/tls_socket.cc
+++ b/src/kudu/security/tls_socket.cc
@@ -55,9 +55,14 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
 
   errno = 0;
   int32_t bytes_written = SSL_write(ssl_.get(), buf, amt);
+  int save_errno = errno;
   if (bytes_written <= 0) {
     auto error_code = SSL_get_error(ssl_.get(), bytes_written);
     if (error_code == SSL_ERROR_WANT_WRITE) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_write error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
       // Socket not ready to write yet.
       *nwritten = 0;
       return Status::OK();
@@ -105,6 +110,10 @@ Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
     }
     auto error_code = SSL_get_error(ssl_.get(), bytes_read);
     if (error_code == SSL_ERROR_WANT_READ) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_read error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
       // Nothing available to read yet.
       *nread = 0;
       return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/28fcab27/src/kudu/util/net/socket.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/net/socket.cc b/src/kudu/util/net/socket.cc
index a7ccfcc..d2448f6 100644
--- a/src/kudu/util/net/socket.cc
+++ b/src/kudu/util/net/socket.cc
@@ -30,6 +30,7 @@
 #include <limits>
 #include <ostream>
 #include <string>
+#include <type_traits>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -60,6 +61,14 @@ DEFINE_bool(socket_inject_short_recvs, false,
 TAG_FLAG(socket_inject_short_recvs, hidden);
 TAG_FLAG(socket_inject_short_recvs, unsafe);
 
+// TODO(todd) consolidate with other copies of this!
+// Retry on EINTR for functions like read() that return -1 on error.
+#define RETRY_ON_EINTR(err, expr) do { \
+  static_assert(std::is_signed<decltype(err)>::value == true, \
+                #err " must be a signed integer"); \
+  (err) = (expr); \
+} while ((err) == -1 && errno == EINTR)
+
 namespace kudu {
 
 Socket::Socket()
@@ -342,20 +351,25 @@ Status Socket::Accept(Socket *new_conn, Sockaddr *remote, int flags) {
   if (flags & FLAG_NONBLOCKING) {
     accept_flags |= SOCK_NONBLOCK;
   }
-  new_conn->Reset(::accept4(fd_, (struct sockaddr*)&addr,
-                  &olen, accept_flags));
-  if (new_conn->GetFd() < 0) {
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept4(fd_, (struct sockaddr*)&addr,
+                             &olen, accept_flags));
+  if (fd < 0) {
     int err = errno;
     return Status::NetworkError(std::string("accept4(2) error: ") +
                                 ErrnoToString(err), Slice(), err);
   }
+  new_conn->Reset(fd);
+
 #else
-  new_conn->Reset(::accept(fd_, (struct sockaddr*)&addr, &olen));
-  if (new_conn->GetFd() < 0) {
+  int fd = -1;
+  RETRY_ON_EINTR(fd, accept(fd_, (struct sockaddr*)&addr, &olen));
+  if (fd < 0) {
     int err = errno;
     return Status::NetworkError(std::string("accept(2) error: ") +
                                 ErrnoToString(err), Slice(), err);
   }
+  new_conn->Reset(fd);
   RETURN_NOT_OK(new_conn->SetNonBlocking(flags & FLAG_NONBLOCKING));
   RETURN_NOT_OK(new_conn->SetCloseOnExec());
 #endif // defined(__linux__)
@@ -512,7 +526,8 @@ Status Socket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
   }
 
   DCHECK_GE(fd_, 0);
-  int res = ::recv(fd_, buf, amt, 0);
+  int res;
+  RETRY_ON_EINTR(res, recv(fd_, buf, amt, 0));
   if (res <= 0) {
     if (res == 0) {
       return Status::NetworkError("Recv() got EOF from remote", Slice(), ESHUTDOWN);


[2/2] kudu git commit: KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

Posted by to...@apache.org.
KUDU-2218. tls_socket: properly handle temporary socket errors in Writev

This fixes a bug which caused RaftConsensusITest.TestLargeBatches to
fail when run under stress, as in the following command line:

taskset -c 0-4 \
 build/latest/bin/raft_consensus-itest \
   --gtest_filter=\*LargeBat\* \
   --stress-cpu-threads=8

This would produce an error like:
Network error: failed to write to TLS socket: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry:s3_pkt.c:878

This means that we were retrying a write after getting EAGAIN, but with
a different buffer than the first time.

I tracked this down to mishandling of temporary socket errors in
TlsSocket::Writev(). In the case that we successfully write part of the
io vector but hit such an error trying to write a later element in the
vector, we were still propagating the error back up to the caller. The
caller didn't realize that part of the write was successful, and thus it
would retry the write from the beginning.

The fix is to fix the above, but also to enable partial writes in
TlsContext. The new test fails if either of the above two changes are
backed out.

While merging from the main trunk, the SCOPED_CLEANUP macro was replaced
with MakeScopedCleanup.

Change-Id: If797f220f42bfb2e6f452b66f15e7a758e883472
Reviewed-on: http://gerrit.cloudera.org:8080/8570
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
(cherry picked from commit 64eb9f37b171419ed12a3795efe28faf2fd33b3d)
Reviewed-on: http://gerrit.cloudera.org:8080/8604
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d4a47487
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d4a47487
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d4a47487

Branch: refs/heads/branch-1.5.x
Commit: d4a47487bb2e4cdceafb23686c2096090e4557ae
Parents: 28fcab2
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 15 22:55:44 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Nov 22 06:09:02 2017 +0000

----------------------------------------------------------------------
 src/kudu/security/tls_context.cc     |   2 +-
 src/kudu/security/tls_socket-test.cc | 277 ++++++++++++++++++++++--------
 src/kudu/security/tls_socket.cc      |  10 +-
 3 files changed, 219 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d4a47487/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_context.cc b/src/kudu/security/tls_context.cc
index 3ab0131..b02ce18 100644
--- a/src/kudu/security/tls_context.cc
+++ b/src/kudu/security/tls_context.cc
@@ -115,7 +115,7 @@ Status TlsContext::Init() {
   if (!ctx_) {
     return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
   }
-  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY);
+  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE);
 
   // Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
   // We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4a47487/src/kudu/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_socket-test.cc b/src/kudu/security/tls_socket-test.cc
index f2b4ffd..c24aaac 100644
--- a/src/kudu/security/tls_socket-test.cc
+++ b/src/kudu/security/tls_socket-test.cc
@@ -17,7 +17,11 @@
 
 #include "kudu/security/tls_handshake.h"
 
+#include <algorithm>
 #include <pthread.h>
+#include <sched.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
 
 #include <atomic>
 #include <csignal>
@@ -28,6 +32,7 @@
 #include <memory>
 #include <string>
 #include <thread>
+#include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -38,6 +43,8 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/net/socket.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
@@ -46,25 +53,27 @@
 using std::string;
 using std::thread;
 using std::unique_ptr;
-
+using std::vector;
 
 namespace kudu {
 namespace security {
 
+const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+
+// Size is big enough to not fit into output socket buffer of default size
+// (controlled by setsockopt() with SO_SNDBUF).
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
 
 class TlsSocketTest : public KuduTest {
  public:
   void SetUp() override {
     KuduTest::SetUp();
-
     ASSERT_OK(client_tls_.Init());
-    ASSERT_OK(server_tls_.Init());
-    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
   }
 
  protected:
+  void ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock);
   TlsContext client_tls_;
-  TlsContext server_tls_;
 };
 
 Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
@@ -101,19 +110,112 @@ Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
   return Status::OK();
 }
 
+void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock) {
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+  *sock = std::move(client_sock);
+}
+
+class EchoServer {
+ public:
+  EchoServer()
+      : pthread_sync_(1) {
+  }
+  ~EchoServer() {
+    Stop();
+    Join();
+  }
+
+  void Start() {
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+    ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
+    ASSERT_OK(listener_.Init(0));
+    ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
+    ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
+
+    thread_ = thread([&] {
+        pthread_ = pthread_self();
+        pthread_sync_.CountDown();
+        unique_ptr<Socket> sock(new Socket());
+        Sockaddr remote;
+        CHECK_OK(listener_.Accept(sock.get(), &remote, /*flags=*/0));
+
+        TlsHandshake server;
+        CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+        CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+        CHECK_OK(server.Finish(&sock));
+
+        CHECK_OK(sock->SetRecvTimeout(kTimeout));
+        unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+        // An "echo" loop for kEchoChunkSize byte buffers.
+        while (!stop_) {
+          size_t n;
+          Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+          }
+
+          LOG(INFO) << "server echoing " << n << " bytes";
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+          }
+          if (slow_read_) {
+            SleepFor(MonoDelta::FromMilliseconds(10));
+          }
+        }
+      });
+  }
+
+  void EnableSlowRead() {
+    slow_read_ = true;
+  }
+
+  const Sockaddr& listen_addr() const {
+    return listen_addr_;
+  }
+
+  bool stopped() const {
+    return stop_;
+  }
+
+  void Stop() {
+    stop_ = true;
+  }
+  void Join() {
+    thread_.join();
+  }
+
+  const pthread_t& pthread() {
+    pthread_sync_.Wait();
+    return pthread_;
+  }
+
+ private:
+  TlsContext server_tls_;
+  Socket listener_;
+  Sockaddr listen_addr_;
+  thread thread_;
+  pthread_t pthread_;
+  CountDownLatch pthread_sync_;
+  std::atomic<bool> stop_ { false };
+
+  bool slow_read_ = false;
+};
+
 void handler(int /* signal */) {}
 
 // Test for failures to handle EINTR during TLS connection
 // negotiation and data send/receive.
 TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
-  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
-  Sockaddr listen_addr;
-  ASSERT_OK(listen_addr.ParseString("127.0.0.1", 0));
-  Socket listener;
-  ASSERT_OK(listener.Init(0));
-  ASSERT_OK(listener.BindAndListen(listen_addr, /*listen_queue_size=*/10));
-  ASSERT_OK(listener.GetSocketAddress(&listen_addr));
-
   // Set up a no-op signal handler for SIGUSR2.
   struct sigaction sa, sa_old;
   memset(&sa, 0, sizeof(sa));
@@ -121,76 +223,117 @@ TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
   sigaction(SIGUSR2, &sa, &sa_old);
   auto sig_cleanup = MakeScopedCleanup([&]() { sigaction(SIGUSR2, &sa_old, nullptr); });
 
-  // Size is big enough to not fit into output socket buffer of default size
-  // (controlled by setsockopt() with SO_SNDBUF).
-  constexpr size_t kSize = 32 * 1024 * 1024;
-
-  pthread_t server_tid;
-  CountDownLatch server_tid_sync(1);
-  std::atomic<bool> stop { false };
-  thread server([&] {
-      server_tid = pthread_self();
-      server_tid_sync.CountDown();
-      unique_ptr<Socket> sock(new Socket());
-      Sockaddr remote;
-      CHECK_OK(listener.Accept(sock.get(), &remote, /*flags=*/0));
-
-      TlsHandshake server;
-      CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
-      CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
-      CHECK_OK(server.Finish(&sock));
-
-      CHECK_OK(sock->SetRecvTimeout(kTimeout));
-      unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
-      // An "echo" loop for kSize byte buffers.
-      while (!stop) {
-        size_t n;
-        Status s = sock->BlockingRecv(buf.get(), kSize, &n, MonoTime::Now() + kTimeout);
-        if (s.ok()) {
-          size_t written;
-          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
-        }
-        if (!s.ok()) {
-          CHECK(stop) << "unexpected error: " << s.ToString();
-        }
-      }
-    });
-  auto server_cleanup = MakeScopedCleanup([&]() { server.join(); });
+  EchoServer server;
+  NO_FATALS(server.Start());
 
   // Start a thread to send signals to the server thread.
   thread killer([&]() {
-    server_tid_sync.Wait();
-    while (!stop) {
-      PCHECK(pthread_kill(server_tid, SIGUSR2) == 0);
-      SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
-    }
-  });
+      while (!server.stopped()) {
+        PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
+        SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+      }
+    });
   auto killer_cleanup = MakeScopedCleanup([&]() { killer.join(); });
 
-  unique_ptr<Socket> client_sock(new Socket());
-  ASSERT_OK(client_sock->Init(0));
-  ASSERT_OK(client_sock->Connect(listen_addr));
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
 
-  TlsHandshake client;
-  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
-  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
-  ASSERT_OK(client.Finish(&client_sock));
-
-  unique_ptr<uint8_t[]> buf(new uint8_t[kSize]);
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
   for (int i = 0; i < 10; i++) {
     SleepFor(MonoDelta::FromMilliseconds(1));
     size_t nwritten;
-    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kSize, &nwritten,
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
         MonoTime::Now() + kTimeout));
     size_t n;
-    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kSize, &n,
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &n,
         MonoTime::Now() + kTimeout));
   }
-  stop = true;
+  server.Stop();
   ASSERT_OK(client_sock->Close());
-
   LOG(INFO) << "client done";
 }
 
+// Return an iovec containing the same data as the buffer 'buf' with the length 'len',
+// but split into random-sized chunks. The chunks are sized randomly between 1 and
+// 'max_chunk_size' bytes.
+vector<struct iovec> ChunkIOVec(Random* rng, uint8_t* buf, int len, int max_chunk_size) {
+  vector<struct iovec> ret;
+  uint8_t* p = buf;
+  int rem = len;
+  while (rem > 0) {
+    int len = rng->Uniform(max_chunk_size) + 1;
+    len = std::min(len, rem);
+    ret.push_back({p, static_cast<size_t>(len)});
+    p += len;
+    rem -= len;
+  }
+  return ret;
+}
+
+// Regression test for KUDU-2218, a bug in which Writev would improperly handle
+// partial writes in non-blocking mode.
+TEST_F(TlsSocketTest, TestNonBlockingWritev) {
+  Random rng(GetRandomSeed32());
+
+  EchoServer server;
+  server.EnableSlowRead();
+  NO_FATALS(server.Start());
+
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+
+  int sndbuf = 16 * 1024;
+  CHECK_ERR(setsockopt(client_sock->GetFd(), SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+  unique_ptr<uint8_t[]> rbuf(new uint8_t[kEchoChunkSize]);
+  RandomString(buf.get(), kEchoChunkSize, &rng);
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(client_sock->SetNonBlocking(true));
+
+    // Prepare an IOV with the input data split into a bunch of randomly-sized
+    // chunks.
+    vector<struct iovec> iov = ChunkIOVec(&rng, buf.get(), kEchoChunkSize, 1024 * 1024);
+
+    // Loop calling writev until the iov is exhausted
+    int rem = kEchoChunkSize;
+    while (rem > 0) {
+      CHECK(!iov.empty()) << rem;
+      int32_t n;
+      Status s = client_sock->Writev(&iov[0], iov.size(), &n);
+      if (Socket::IsTemporarySocketError(s.posix_code())) {
+        sched_yield();
+        continue;
+      }
+      ASSERT_OK(s);
+      rem -= n;
+      ASSERT_GE(n, 0);
+      while (n > 0) {
+        if (n < iov[0].iov_len) {
+          iov[0].iov_len -= n;
+          iov[0].iov_base = reinterpret_cast<uint8_t*>(iov[0].iov_base) + n;
+          n = 0;
+        } else {
+          n -= iov[0].iov_len;
+          iov.erase(iov.begin());
+        }
+      }
+    }
+    LOG(INFO) << "client waiting";
+
+    size_t n;
+    ASSERT_OK(client_sock->SetNonBlocking(false));
+    ASSERT_OK(client_sock->BlockingRecv(rbuf.get(), kEchoChunkSize, &n,
+        MonoTime::Now() + kTimeout));
+    LOG(INFO) << "client got response";
+
+    ASSERT_EQ(0, memcmp(buf.get(), rbuf.get(), kEchoChunkSize));
+  }
+
+  server.Stop();
+  ASSERT_OK(client_sock->Close());
+}
+
 } // namespace security
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/d4a47487/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_socket.cc b/src/kudu/security/tls_socket.cc
index 273ebe6..12e36f7 100644
--- a/src/kudu/security/tls_socket.cc
+++ b/src/kudu/security/tls_socket.cc
@@ -28,6 +28,7 @@
 #include "kudu/gutil/basictypes.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/net/socket.h"
 
 namespace kudu {
 namespace security {
@@ -45,11 +46,11 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
   CHECK(ssl_);
   SCOPED_OPENSSL_NO_PENDING_ERRORS;
 
+  *nwritten = 0;
   if (PREDICT_FALSE(amt == 0)) {
     // Writing an empty buffer is a no-op. This happens occasionally, eg in the
     // case where the response has an empty sidecar. We have to special case
     // it, because SSL_write can return '0' to indicate certain types of errors.
-    *nwritten = 0;
     return Status::OK();
   }
 
@@ -64,7 +65,6 @@ Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
                                     ErrnoToString(save_errno), save_errno);
       }
       // Socket not ready to write yet.
-      *nwritten = 0;
       return Status::OK();
     }
     return Status::NetworkError("failed to write to TLS socket",
@@ -93,6 +93,12 @@ Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int32_t *nwritt
   }
   RETURN_NOT_OK(SetTcpCork(0));
   *nwritten = total_written;
+  // If we did manage to write something, but not everything, due to a temporary socket
+  // error, then we should still return an OK status indicating a successful _partial_
+  // write.
+  if (total_written > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
+    return Status::OK();
+  }
   return write_status;
 }