You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:44 UTC

[32/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/test/test_certs.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/test/test_certs.h b/be/src/kudu/security/test/test_certs.h
new file mode 100644
index 0000000..7767cb2
--- /dev/null
+++ b/be/src/kudu/security/test/test_certs.h
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+namespace kudu {
+class Status;
+
+namespace security {
+
+//
+// Set of certificates and private keys used for certificate generation
+// and signing tests (declarations).  See the .cc file for the actual data.
+//
+
+// Valid root CA cerificate (PEM format).
+extern const char kCaCert[];
+// The private key (RSA, 2048 bits) for the certificate above.
+// This is 2048 bit RSA key, in PEM format.
+extern const char kCaPrivateKey[];
+// The public part of the abovementioned private key.
+extern const char kCaPublicKey[];
+
+// Expired root CA certificate (PEM format).
+extern const char kCaExpiredCert[];
+// The private key for the expired CA certificate described above.
+// This is 2048 bit RSA key, in PEM format.
+extern const char kCaExpiredPrivateKey[];
+// The public part of the abovementioned private key.
+extern const char kCaExpiredPublicKey[];
+// Certificate with multiple DNS hostnames in the SAN field.
+extern const char kCertDnsHostnamesInSan[];
+
+extern const char kDataTiny[];
+extern const char kSignatureTinySHA512[];
+
+extern const char kDataShort[];
+extern const char kSignatureShortSHA512[];
+
+extern const char kDataLong[];
+extern const char kSignatureLongSHA512[];
+
+// Creates a matching SSL certificate and unencrypted private key file in 'dir',
+// returning their paths in '*cert_file' and '*key_file'.
+Status CreateTestSSLCertWithPlainKey(const std::string& dir,
+                                     std::string* cert_file,
+                                     std::string* key_file);
+
+// Same as the CreateTestSSLCertWithPlainKey() except that the private key is
+// encrypted with a password that is returned in 'key_password'.
+Status CreateTestSSLCertWithEncryptedKey(const std::string& dir,
+                                         std::string* cert_file,
+                                         std::string* key_file,
+                                         std::string* key_password);
+
+// Same as the CreateTestSSLCertWithPlainKey() except that the 'cert_file' is
+// signed by a CA chain ('ca_cert_file' is a chain of certificates).
+Status CreateTestSSLCertSignedByChain(const std::string& dir,
+                                      std::string* cert_file,
+                                      std::string* key_file,
+                                      std::string* ca_cert_file);
+
+// Same as the CreateTestSSLCertWithPlainKey() except that the 'cert_file' is
+// a chain signed by a root CA ('ca_cert_file' is only the root CA).
+Status CreateTestSSLCertWithChainSignedByRoot(const std::string& dir,
+                                              std::string* cert_file,
+                                              std::string* key_file,
+                                              std::string* ca_cert_file);
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/test/test_pass.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/test/test_pass.cc b/be/src/kudu/security/test/test_pass.cc
new file mode 100644
index 0000000..f99ab3c
--- /dev/null
+++ b/be/src/kudu/security/test/test_pass.cc
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/test/test_pass.h"
+
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
+
+using std::string;
+
+namespace kudu {
+namespace security {
+
+Status CreateTestHTPasswd(const string& dir,
+                          string* passwd_file) {
+
+  // In the format of user:realm:digest. Digest is generated based on
+  // user/password pair in kTestAuthString
+  const char *kHTPasswd = "test:mydomain.com:8b6f595afb3c037b7bd79b89d9576d06";
+  *passwd_file = JoinPathSegments(dir, "test.passwd");
+  RETURN_NOT_OK(WriteStringToFile(Env::Default(), kHTPasswd, *passwd_file));
+  return Status::OK();
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/test/test_pass.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/test/test_pass.h b/be/src/kudu/security/test/test_pass.h
new file mode 100644
index 0000000..5f730fe
--- /dev/null
+++ b/be/src/kudu/security/test/test_pass.h
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace security {
+
+// Username and password for HTTP authentication, corresponding to
+// .htpasswd created by CreateTestHTPasswd()
+const std::string kTestAuthString = "test:test";
+
+// Creates .htpasswd for HTTP basic authentication in the format
+// of 'user:realm:digest', returning the path in '*passwd_file'.
+Status CreateTestHTPasswd(const std::string &dir,
+                          std::string *passwd_file);
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.cc b/be/src/kudu/security/tls_context.cc
new file mode 100644
index 0000000..9bf433d
--- /dev/null
+++ b/be/src/kudu/security/tls_context.cc
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_context.h"
+
+#include <algorithm>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <openssl/err.h>
+#include <openssl/ssl.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/ca/cert_management.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/init.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/security_flags.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/user.h"
+
+// Hard code OpenSSL flag values from OpenSSL 1.0.1e[1][2] when compiling
+// against OpenSSL 1.0.0 and below. We detect when running against a too-old
+// version of OpenSSL using these definitions at runtime so that Kudu has full
+// functionality when run against a new OpenSSL version, even if it's compiled
+// against an older version.
+//
+// [1]: https://github.com/openssl/openssl/blob/OpenSSL_1_0_1e/ssl/ssl.h#L605-L609
+// [2]: https://github.com/openssl/openssl/blob/OpenSSL_1_0_1e/ssl/tls1.h#L166-L172
+#ifndef SSL_OP_NO_TLSv1
+#define SSL_OP_NO_TLSv1 0x04000000U
+#endif
+#ifndef SSL_OP_NO_TLSv1_1
+#define SSL_OP_NO_TLSv1_1 0x10000000U
+#endif
+#ifndef TLS1_1_VERSION
+#define TLS1_1_VERSION 0x0302
+#endif
+#ifndef TLS1_2_VERSION
+#define TLS1_2_VERSION 0x0303
+#endif
+
+using strings::Substitute;
+using std::string;
+using std::unique_lock;
+using std::vector;
+
+DEFINE_int32(ipki_server_key_size, 2048,
+             "the number of bits for server cert's private key. The server cert "
+             "is used for TLS connections to and from clients and other servers.");
+TAG_FLAG(ipki_server_key_size, experimental);
+
+namespace kudu {
+namespace security {
+
+using ca::CertRequestGenerator;
+
+template<> struct SslTypeTraits<SSL> {
+  static constexpr auto kFreeFunc = &SSL_free;
+};
+template<> struct SslTypeTraits<X509_STORE_CTX> {
+  static constexpr auto kFreeFunc = &X509_STORE_CTX_free;
+};
+
+namespace {
+
+Status CheckMaxSupportedTlsVersion(int tls_version, const char* tls_version_str) {
+  // OpenSSL 1.1 and newer supports all of the TLS versions we care about, so
+  // the below check is only necessary in older versions of OpenSSL.
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  auto max_supported_tls_version = SSLv23_method()->version;
+  DCHECK_GE(max_supported_tls_version, TLS1_VERSION);
+
+  if (max_supported_tls_version < tls_version) {
+    return Status::InvalidArgument(
+        Substitute("invalid minimum TLS protocol version (--rpc_tls_min_protocol): "
+                   "this platform does not support $0", tls_version_str));
+  }
+#endif
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+TlsContext::TlsContext()
+    : tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
+      lock_(RWMutex::Priority::PREFER_READING),
+      trusted_cert_count_(0),
+      has_cert_(false),
+      is_external_cert_(false) {
+  security::InitializeOpenSSL();
+}
+
+TlsContext::TlsContext(std::string tls_ciphers, std::string tls_min_protocol)
+    : tls_ciphers_(std::move(tls_ciphers)),
+      tls_min_protocol_(std::move(tls_min_protocol)),
+      lock_(RWMutex::Priority::PREFER_READING),
+      trusted_cert_count_(0),
+      has_cert_(false),
+      is_external_cert_(false) {
+  security::InitializeOpenSSL();
+}
+
+Status TlsContext::Init() {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(!ctx_);
+
+  // NOTE: 'SSLv23 method' sounds like it would enable only SSLv2 and SSLv3, but in fact
+  // this is a sort of wildcard which enables all methods (including TLSv1 and later).
+  // We explicitly disable SSLv2 and SSLv3 below so that only TLS methods remain.
+  // See the discussion on https://trac.torproject.org/projects/tor/ticket/11598 for more
+  // info.
+  ctx_ = ssl_make_unique(SSL_CTX_new(SSLv23_method()));
+  if (!ctx_) {
+    return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
+  }
+  SSL_CTX_set_mode(ctx_.get(), SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE);
+
+  // Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
+  // We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not
+  // not support TLSv1.1 or later.
+  //
+  // Disable SSL/TLS compression to free up CPU resources and be less prone
+  // to attacks exploiting the compression feature:
+  //   https://tools.ietf.org/html/rfc7525#section-3.3
+  auto options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION;
+
+  if (boost::iequals(tls_min_protocol_, "TLSv1.2")) {
+    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_2_VERSION, "TLSv1.2"));
+    options |= SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1;
+  } else if (boost::iequals(tls_min_protocol_, "TLSv1.1")) {
+    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_1_VERSION, "TLSv1.1"));
+    options |= SSL_OP_NO_TLSv1;
+  } else if (!boost::iequals(tls_min_protocol_, "TLSv1")) {
+    return Status::InvalidArgument("unknown value provided for --rpc_tls_min_protocol flag",
+                                   tls_min_protocol_);
+  }
+
+  SSL_CTX_set_options(ctx_.get(), options);
+
+  OPENSSL_RET_NOT_OK(
+      SSL_CTX_set_cipher_list(ctx_.get(), tls_ciphers_.c_str()),
+      "failed to set TLS ciphers");
+
+  // Enable ECDH curves. For OpenSSL 1.1.0 and up, this is done automatically.
+#ifndef OPENSSL_NO_ECDH
+#if OPENSSL_VERSION_NUMBER < 0x10002000L
+  // OpenSSL 1.0.1 and below only support setting a single ECDH curve at once.
+  // We choose prime256v1 because it's the first curve listed in the "modern
+  // compatibility" section of the Mozilla Server Side TLS recommendations,
+  // accessed Feb. 2017.
+  c_unique_ptr<EC_KEY> ecdh { EC_KEY_new_by_curve_name(NID_X9_62_prime256v1), &EC_KEY_free };
+  OPENSSL_RET_IF_NULL(ecdh, "failed to create prime256v1 curve");
+  OPENSSL_RET_NOT_OK(SSL_CTX_set_tmp_ecdh(ctx_.get(), ecdh.get()),
+                     "failed to set ECDH curve");
+#elif OPENSSL_VERSION_NUMBER < 0x10100000L
+  // OpenSSL 1.0.2 provides the set_ecdh_auto API which internally figures out
+  // the best curve to use.
+  OPENSSL_RET_NOT_OK(SSL_CTX_set_ecdh_auto(ctx_.get(), 1),
+                     "failed to configure ECDH support");
+#endif
+#endif
+
+  // TODO(KUDU-1926): is it possible to disable client-side renegotiation? it seems there
+  // have been various CVEs related to this feature that we don't need.
+  return Status::OK();
+}
+
+Status TlsContext::VerifyCertChainUnlocked(const Cert& cert) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  X509_STORE* store = SSL_CTX_get_cert_store(ctx_.get());
+  auto store_ctx = ssl_make_unique<X509_STORE_CTX>(X509_STORE_CTX_new());
+
+  OPENSSL_RET_NOT_OK(X509_STORE_CTX_init(store_ctx.get(), store, cert.GetTopOfChainX509(),
+                     cert.GetRawData()), "could not init X509_STORE_CTX");
+  int rc = X509_verify_cert(store_ctx.get());
+  if (rc != 1) {
+    int err = X509_STORE_CTX_get_error(store_ctx.get());
+    if (err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT) {
+      // It's OK to provide a self-signed cert.
+      ERR_clear_error(); // in case it left anything on the queue.
+      return Status::OK();
+    }
+
+    // Get the cert that failed to verify.
+    X509* cur_cert = X509_STORE_CTX_get_current_cert(store_ctx.get());
+    string cert_details;
+    if (cur_cert) {
+      cert_details = Substitute(" (error with cert: subject=$0, issuer=$1)",
+                                X509NameToString(X509_get_subject_name(cur_cert)),
+                                X509NameToString(X509_get_issuer_name(cur_cert)));
+    }
+
+    ERR_clear_error(); // in case it left anything on the queue.
+    return Status::RuntimeError(
+        Substitute("could not verify certificate chain$0", cert_details),
+        X509_verify_cert_error_string(err));
+  }
+  return Status::OK();
+}
+
+Status TlsContext::UseCertificateAndKey(const Cert& cert, const PrivateKey& key) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  // Verify that the cert and key match.
+  RETURN_NOT_OK(cert.CheckKeyMatch(key));
+
+  std::unique_lock<RWMutex> lock(lock_);
+
+  // Verify that the appropriate CA certs have been loaded into the context
+  // before we adopt a cert. Otherwise, client connections without the CA cert
+  // available would fail.
+  RETURN_NOT_OK(VerifyCertChainUnlocked(cert));
+
+  CHECK(!has_cert_);
+
+  OPENSSL_RET_NOT_OK(SSL_CTX_use_PrivateKey(ctx_.get(), key.GetRawData()),
+                     "failed to use private key");
+  OPENSSL_RET_NOT_OK(SSL_CTX_use_certificate(ctx_.get(), cert.GetTopOfChainX509()),
+                     "failed to use certificate");
+  has_cert_ = true;
+  return Status::OK();
+}
+
+Status TlsContext::AddTrustedCertificate(const Cert& cert) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  VLOG(2) << "Trusting certificate " << cert.SubjectName();
+
+  {
+    // Workaround for a leak in OpenSSL <1.0.1:
+    //
+    // If we start trusting a cert, and its internal public-key field hasn't
+    // yet been populated, then the first time it's used for verification will
+    // populate it. In the case that two threads try to populate it at the same time,
+    // one of the thread's copies will be leaked.
+    //
+    // To avoid triggering the race, we populate the internal public key cache
+    // field up front before adding it to the trust store.
+    //
+    // See OpenSSL commit 33a688e80674aaecfac6d9484ec199daa0ee5b61.
+    PublicKey k;
+    CHECK_OK(cert.GetPublicKey(&k));
+  }
+
+  unique_lock<RWMutex> lock(lock_);
+  auto* cert_store = SSL_CTX_get_cert_store(ctx_.get());
+
+  // Iterate through the certificate chain and add each individual certificate to the store.
+  for (int i = 0; i < cert.chain_len(); ++i) {
+    X509* inner_cert = sk_X509_value(cert.GetRawData(), i);
+    int rc = X509_STORE_add_cert(cert_store, inner_cert);
+    if (rc <= 0) {
+      // Ignore the common case of re-adding a cert that is already in the
+      // trust store.
+      auto err = ERR_peek_error();
+      if (ERR_GET_LIB(err) == ERR_LIB_X509 &&
+          ERR_GET_REASON(err) == X509_R_CERT_ALREADY_IN_HASH_TABLE) {
+        ERR_clear_error();
+        return Status::OK();
+      }
+      OPENSSL_RET_NOT_OK(rc, "failed to add trusted certificate");
+    }
+  }
+  trusted_cert_count_ += 1;
+  return Status::OK();
+}
+
+Status TlsContext::DumpTrustedCerts(vector<string>* cert_ders) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  shared_lock<RWMutex> lock(lock_);
+
+  vector<string> ret;
+  auto* cert_store = SSL_CTX_get_cert_store(ctx_.get());
+
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+#define STORE_LOCK(CS) CRYPTO_w_lock(CRYPTO_LOCK_X509_STORE)
+#define STORE_UNLOCK(CS) CRYPTO_w_unlock(CRYPTO_LOCK_X509_STORE)
+#define STORE_GET_X509_OBJS(CS) (CS)->objs
+#define X509_OBJ_GET_TYPE(X509_OBJ) (X509_OBJ)->type
+#define X509_OBJ_GET_X509(X509_OBJ) (X509_OBJ)->data.x509
+#else
+#define STORE_LOCK(CS) CHECK_EQ(1, X509_STORE_lock(CS)) << "Could not lock certificate store"
+#define STORE_UNLOCK(CS) CHECK_EQ(1, X509_STORE_unlock(CS)) << "Could not unlock certificate store"
+#define STORE_GET_X509_OBJS(CS) X509_STORE_get0_objects(CS)
+#define X509_OBJ_GET_TYPE(X509_OBJ) X509_OBJECT_get_type(X509_OBJ)
+#define X509_OBJ_GET_X509(X509_OBJ) X509_OBJECT_get0_X509(X509_OBJ)
+#endif
+
+  STORE_LOCK(cert_store);
+  auto unlock = MakeScopedCleanup([&]() {
+      STORE_UNLOCK(cert_store);
+    });
+  auto* objects = STORE_GET_X509_OBJS(cert_store);
+  int num_objects = sk_X509_OBJECT_num(objects);
+  for (int i = 0; i < num_objects; i++) {
+    auto* obj = sk_X509_OBJECT_value(objects, i);
+    if (X509_OBJ_GET_TYPE(obj) != X509_LU_X509) continue;
+    auto* x509 = X509_OBJ_GET_X509(obj);
+    Cert c;
+    c.AdoptAndAddRefX509(x509);
+    string der;
+    RETURN_NOT_OK(c.ToString(&der, DataFormat::DER));
+    ret.emplace_back(std::move(der));
+  }
+
+  cert_ders->swap(ret);
+  return Status::OK();
+}
+
+namespace {
+Status SetCertAttributes(CertRequestGenerator::Config* config) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  RETURN_NOT_OK_PREPEND(GetFQDN(&config->hostname), "could not determine FQDN for CSR");
+
+  // If the server has logged in from a keytab, then we have a 'real' identity,
+  // and our desired CN should match the local username mapped from the Kerberos
+  // principal name. Otherwise, we'll make up a common name based on the hostname.
+  boost::optional<string> principal = GetLoggedInPrincipalFromKeytab();
+  if (!principal) {
+    string uid;
+    RETURN_NOT_OK_PREPEND(GetLoggedInUser(&uid),
+                          "couldn't get local username");
+    config->user_id = uid;
+    return Status::OK();
+  }
+  string uid;
+  RETURN_NOT_OK_PREPEND(security::MapPrincipalToLocalName(*principal, &uid),
+                        "could not get local username for krb5 principal");
+  config->user_id = uid;
+  config->kerberos_principal = *principal;
+  return Status::OK();
+}
+} // anonymous namespace
+
+Status TlsContext::GenerateSelfSignedCertAndKey() {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  // Step 1: generate the private key to be self signed.
+  PrivateKey key;
+  RETURN_NOT_OK_PREPEND(GeneratePrivateKey(FLAGS_ipki_server_key_size,
+                                           &key),
+                                           "failed to generate private key");
+
+  // Step 2: generate a CSR so that the self-signed cert can eventually be
+  // replaced with a CA-signed cert.
+  CertRequestGenerator::Config config;
+  RETURN_NOT_OK(SetCertAttributes(&config));
+  CertRequestGenerator gen(config);
+  RETURN_NOT_OK_PREPEND(gen.Init(), "could not initialize CSR generator");
+  CertSignRequest csr;
+  RETURN_NOT_OK_PREPEND(gen.GenerateRequest(key, &csr), "could not generate CSR");
+
+  // Step 3: generate a self-signed cert that we can use for terminating TLS
+  // connections until we get the CA-signed cert.
+  Cert cert;
+  RETURN_NOT_OK_PREPEND(ca::CertSigner::SelfSignCert(key, config, &cert),
+                        "failed to self-sign cert");
+
+  // Workaround for an OpenSSL memory leak caused by a race in x509v3_cache_extensions.
+  // Upon first use of each certificate, this function gets called to parse various
+  // fields of the certificate. However, it's racey, so if multiple "first calls"
+  // happen concurrently, one call overwrites the cached data from another, causing
+  // a leak. Calling this nonsense X509_check_ca() forces the X509 extensions to
+  // get cached, so we don't hit the race later. 'VerifyCertChain' also has the
+  // effect of triggering the racy codepath.
+  ignore_result(X509_check_ca(cert.GetTopOfChainX509()));
+  ERR_clear_error(); // in case it left anything on the queue.
+
+  // Step 4: Adopt the new key and cert.
+  unique_lock<RWMutex> lock(lock_);
+  CHECK(!has_cert_);
+  OPENSSL_RET_NOT_OK(SSL_CTX_use_PrivateKey(ctx_.get(), key.GetRawData()),
+                     "failed to use private key");
+  OPENSSL_RET_NOT_OK(SSL_CTX_use_certificate(ctx_.get(), cert.GetTopOfChainX509()),
+                     "failed to use certificate");
+  has_cert_ = true;
+  csr_ = std::move(csr);
+  return Status::OK();
+}
+
+boost::optional<CertSignRequest> TlsContext::GetCsrIfNecessary() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  shared_lock<RWMutex> lock(lock_);
+  if (csr_) {
+    return csr_->Clone();
+  }
+  return boost::none;
+}
+
+Status TlsContext::AdoptSignedCert(const Cert& cert) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  unique_lock<RWMutex> lock(lock_);
+
+  if (!csr_) {
+    // A signed cert has already been adopted.
+    return Status::OK();
+  }
+
+  // Verify that the appropriate CA certs have been loaded into the context
+  // before we adopt a cert. Otherwise, client connections without the CA cert
+  // available would fail.
+  RETURN_NOT_OK(VerifyCertChainUnlocked(cert));
+
+  PublicKey csr_key;
+  RETURN_NOT_OK(csr_->GetPublicKey(&csr_key));
+  PublicKey cert_key;
+  RETURN_NOT_OK(cert.GetPublicKey(&cert_key));
+  bool equals;
+  RETURN_NOT_OK(csr_key.Equals(cert_key, &equals));
+  if (!equals) {
+    return Status::RuntimeError("certificate public key does not match the CSR public key");
+  }
+
+  OPENSSL_RET_NOT_OK(SSL_CTX_use_certificate(ctx_.get(), cert.GetTopOfChainX509()),
+                     "failed to use certificate");
+
+  // This should never fail since we already compared the cert's public key
+  // against the CSR, but better safe than sorry. If this *does* fail, it
+  // appears to remove the private key from the SSL_CTX, so we are left in a bad
+  // state.
+  OPENSSL_CHECK_OK(SSL_CTX_check_private_key(ctx_.get()))
+    << "certificate does not match the private key";
+
+  csr_ = boost::none;
+
+  return Status::OK();
+}
+
+Status TlsContext::LoadCertificateAndKey(const string& certificate_path,
+                                         const string& key_path) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  Cert c;
+  RETURN_NOT_OK(c.FromFile(certificate_path, DataFormat::PEM));
+  PrivateKey k;
+  RETURN_NOT_OK(k.FromFile(key_path, DataFormat::PEM));
+  is_external_cert_ = true;
+  return UseCertificateAndKey(c, k);
+}
+
+Status TlsContext::LoadCertificateAndPasswordProtectedKey(const string& certificate_path,
+                                                          const string& key_path,
+                                                          const PasswordCallback& password_cb) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  Cert c;
+  RETURN_NOT_OK_PREPEND(c.FromFile(certificate_path, DataFormat::PEM),
+                        "failed to load certificate");
+  PrivateKey k;
+  RETURN_NOT_OK_PREPEND(k.FromFile(key_path, DataFormat::PEM, password_cb),
+                        "failed to load private key file");
+  RETURN_NOT_OK(UseCertificateAndKey(c, k));
+  is_external_cert_ = true;
+  return Status::OK();
+}
+
+Status TlsContext::LoadCertificateAuthority(const string& certificate_path) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  if (has_cert_) DCHECK(is_external_cert_);
+  Cert c;
+  RETURN_NOT_OK(c.FromFile(certificate_path, DataFormat::PEM));
+  return AddTrustedCertificate(c);
+}
+
+Status TlsContext::InitiateHandshake(TlsHandshakeType handshake_type,
+                                     TlsHandshake* handshake) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(ctx_);
+  CHECK(!handshake->ssl_);
+  {
+    shared_lock<RWMutex> lock(lock_);
+    handshake->adopt_ssl(ssl_make_unique(SSL_new(ctx_.get())));
+  }
+  if (!handshake->ssl_) {
+    return Status::RuntimeError("failed to create SSL handle", GetOpenSSLErrors());
+  }
+
+  SSL_set_bio(handshake->ssl(),
+              BIO_new(BIO_s_mem()),
+              BIO_new(BIO_s_mem()));
+
+  switch (handshake_type) {
+    case TlsHandshakeType::SERVER:
+      SSL_set_accept_state(handshake->ssl());
+      break;
+    case TlsHandshakeType::CLIENT:
+      SSL_set_connect_state(handshake->ssl());
+      break;
+  }
+
+  return Status::OK();
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_context.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.h b/be/src/kudu/security/tls_context.h
new file mode 100644
index 0000000..786ab6e
--- /dev/null
+++ b/be/src/kudu/security/tls_context.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+
+#include "kudu/gutil/port.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/util/status.h"
+// IWYU pragma: no_include "kudu/security/cert.h"
+
+namespace kudu {
+namespace security {
+
+class Cert;           // IWYU pragma: keep
+class CertSignRequest;// IWYU pragma: keep
+class PrivateKey;
+
+// TlsContext wraps data required by the OpenSSL library for creating and
+// accepting TLS protected channels. A single TlsContext instance should be used
+// per server or client instance.
+//
+// Internally, a 'TlsContext' manages a single keypair which it uses for
+// terminating TLS connections. It also manages a collection of trusted root CA
+// certificates (a trust store), as well as a signed certificate for the
+// keypair.
+//
+// When used on a server, the TlsContext can generate a keypair and a
+// self-signed certificate, and provide a CSR for transititioning to a CA-signed
+// certificate. This allows Kudu servers to start with a self-signed
+// certificate, and later adopt a CA-signed certificate as it becomes available.
+// See GenerateSelfSignedCertAndKey(), GetCsrIfNecessary(), and
+// AdoptSignedCert() for details on how to generate the keypair and self-signed
+// cert, access the CSR, and transtition to a CA-signed cert, repectively.
+//
+// When used in a client or a server, the TlsContext can immediately adopt a
+// private key and CA-signed cert using UseCertificateAndKey(). A TlsContext
+// only manages a single keypair, so if UseCertificateAndKey() is called,
+// GenerateSelfSignedCertAndKey() must not be called, and vice versa.
+//
+// TlsContext may be used with or without a keypair and cert to initiate TLS
+// connections, when mutual TLS authentication is not needed (for example, for
+// token or Kerberos authenticated connections).
+//
+// This class is thread-safe after initialization.
+class TlsContext {
+
+ public:
+
+  TlsContext();
+
+  TlsContext(std::string tls_ciphers, std::string tls_min_protocol);
+
+  ~TlsContext() = default;
+
+  Status Init() WARN_UNUSED_RESULT;
+
+  // Returns true if this TlsContext has been configured with a cert and key for
+  // use with TLS-encrypted connections.
+  bool has_cert() const {
+    shared_lock<RWMutex> lock(lock_);
+    return has_cert_;
+  }
+
+  // Returns true if this TlsContext has been configured with a CA-signed TLS
+  // cert and key for use with TLS-encrypted connections. If this method returns
+  // true, then 'has_trusted_cert' will also return true.
+  bool has_signed_cert() const {
+    shared_lock<RWMutex> lock(lock_);
+    return has_cert_ && !csr_;
+  }
+
+  // Returns true if this TlsContext has at least one certificate in its trust store.
+  bool has_trusted_cert() const {
+    shared_lock<RWMutex> lock(lock_);
+    return trusted_cert_count_ > 0;
+  }
+
+  // Adds 'cert' as a trusted root CA certificate.
+  //
+  // This determines whether other peers are trusted. It also must be called for
+  // any CA certificates that are part of the certificate chain for the cert
+  // passed in to 'UseCertificateAndKey()' or 'AdoptSignedCert()'.
+  //
+  // If this cert has already been marked as trusted, this has no effect.
+  Status AddTrustedCertificate(const Cert& cert) WARN_UNUSED_RESULT;
+
+  // Dump all of the certs that are currently trusted by this context, in DER
+  // form, into 'cert_ders'.
+  Status DumpTrustedCerts(std::vector<std::string>* cert_ders) const WARN_UNUSED_RESULT;
+
+  // Uses 'cert' and 'key' as the cert and key for use with TLS connections.
+  //
+  // Checks that the CA that issued the signature on 'cert' is already trusted
+  // by this context (e.g. by AddTrustedCertificate()).
+  Status UseCertificateAndKey(const Cert& cert, const PrivateKey& key) WARN_UNUSED_RESULT;
+
+  // Generates a self-signed cert and key for use with TLS connections.
+  //
+  // This method should only be used on the server. Once this method is called,
+  // 'GetCsrIfNecessary' can be used to retrieve a CSR for generating a
+  // CA-signed cert for the generated private key, and 'AdoptSignedCert' can be
+  // used to transition to using the CA-signed cert with subsequent TLS
+  // connections.
+  Status GenerateSelfSignedCertAndKey() WARN_UNUSED_RESULT;
+
+  // Returns a new certificate signing request (CSR) in DER format, if this
+  // context's cert is self-signed. If the cert is already signed, returns
+  // boost::none.
+  boost::optional<CertSignRequest> GetCsrIfNecessary() const;
+
+  // Adopts the provided CA-signed certificate for this TLS context.
+  //
+  // The certificate must correspond to a CSR previously returned by
+  // 'GetCsrIfNecessary()'.
+  //
+  // Checks that the CA that issued the signature on 'cert' is already trusted
+  // by this context (e.g. by AddTrustedCertificate()).
+  //
+  // This has no effect if the instance already has a CA-signed cert.
+  Status AdoptSignedCert(const Cert& cert) WARN_UNUSED_RESULT;
+
+  // Convenience functions for loading cert/CA/key from file paths.
+  // -------------------------------------------------------------
+
+  // Load the server certificate and key (PEM encoded).
+  Status LoadCertificateAndKey(const std::string& certificate_path,
+                               const std::string& key_path) WARN_UNUSED_RESULT;
+
+  // Load the server certificate and key (PEM encoded), and use the callback
+  // 'password_cb' to obtain the password that can decrypt the key.
+  Status LoadCertificateAndPasswordProtectedKey(const std::string& certificate_path,
+                                                const std::string& key_path,
+                                                const PasswordCallback& password_cb)
+                                                WARN_UNUSED_RESULT;
+
+  // Load the certificate authority (PEM encoded).
+  Status LoadCertificateAuthority(const std::string& certificate_path) WARN_UNUSED_RESULT;
+
+  // Initiates a new TlsHandshake instance.
+  Status InitiateHandshake(TlsHandshakeType handshake_type,
+                           TlsHandshake* handshake) const WARN_UNUSED_RESULT;
+
+  // Return the number of certs that have been marked as trusted.
+  // Used by tests.
+  int trusted_cert_count_for_tests() const {
+    shared_lock<RWMutex> lock(lock_);
+    return trusted_cert_count_;
+  }
+
+  bool is_external_cert() const { return is_external_cert_; }
+
+ private:
+
+  Status VerifyCertChainUnlocked(const Cert& cert) WARN_UNUSED_RESULT;
+
+  // The cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+  // cipher preference list format. See man (1) ciphers for more information.
+  std::string tls_ciphers_;
+
+  // The minimum protocol version to allow when for securing RPC connections with TLS. May be
+  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  std::string tls_min_protocol_;
+
+  // Protects all members.
+  //
+  // Taken in write mode when any changes are modifying the underlying SSL_CTX
+  // using a mutating method (eg SSL_CTX_use_*) or when changing the value of
+  // any of our own member variables.
+  mutable RWMutex lock_;
+  c_unique_ptr<SSL_CTX> ctx_;
+  int32_t trusted_cert_count_;
+  bool has_cert_;
+  bool is_external_cert_;
+  boost::optional<CertSignRequest> csr_;
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_handshake-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_handshake-test.cc b/be/src/kudu/security/tls_handshake-test.cc
new file mode 100644
index 0000000..c863d05
--- /dev/null
+++ b/be/src/kudu/security/tls_handshake-test.cc
@@ -0,0 +1,390 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <atomic>
+#include <iostream>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/security/ca/cert_management.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/security-test-util.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+DECLARE_int32(ipki_server_key_size);
+
+namespace kudu {
+namespace security {
+
+using ca::CertSigner;
+
+struct Case {
+  PkiConfig client_pki;
+  TlsVerificationMode client_verification;
+  PkiConfig server_pki;
+  TlsVerificationMode server_verification;
+  Status expected_status;
+};
+
+// Beautifies CLI test output.
+std::ostream& operator<<(std::ostream& o, Case c) {
+  auto verification_mode_name = [] (const TlsVerificationMode& verification_mode) {
+    switch (verification_mode) {
+      case TlsVerificationMode::VERIFY_NONE: return "NONE";
+      case TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST: return "REMOTE_CERT_AND_HOST";
+    }
+    return "unreachable";
+  };
+
+  o << "{client-pki: " << c.client_pki << ", "
+    << "client-verification: " << verification_mode_name(c.client_verification) << ", "
+    << "server-pki: " << c.server_pki << ", "
+    << "server-verification: " << verification_mode_name(c.server_verification) << ", "
+    << "expected-status: " << c.expected_status.ToString() << "}";
+
+  return o;
+}
+
+class TestTlsHandshakeBase : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+
+    ASSERT_OK(client_tls_.Init());
+    ASSERT_OK(server_tls_.Init());
+  }
+
+ protected:
+  // Run a handshake using 'client_tls_' and 'server_tls_'. The client and server
+  // verification modes are set to 'client_verify' and 'server_verify' respectively.
+  Status RunHandshake(TlsVerificationMode client_verify,
+                      TlsVerificationMode server_verify) {
+    TlsHandshake client, server;
+    RETURN_NOT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+    RETURN_NOT_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+
+    client.set_verification_mode(client_verify);
+    server.set_verification_mode(server_verify);
+
+    bool client_done = false, server_done = false;
+    string to_client;
+    string to_server;
+    while (!client_done || !server_done) {
+      if (!client_done) {
+        Status s = client.Continue(to_client, &to_server);
+        VLOG(1) << "client->server: " << to_server.size() << " bytes";
+        if (s.ok()) {
+          client_done = true;
+        } else if (!s.IsIncomplete()) {
+          CHECK(s.IsRuntimeError());
+          return s.CloneAndPrepend("client error");
+        }
+      }
+      if (!server_done) {
+        CHECK(!client_done);
+        Status s = server.Continue(to_server, &to_client);
+        VLOG(1) << "server->client: " << to_client.size() << " bytes";
+        if (s.ok()) {
+          server_done = true;
+        } else if (!s.IsIncomplete()) {
+          CHECK(s.IsRuntimeError());
+          return s.CloneAndPrepend("server error");
+        }
+      }
+    }
+    return Status::OK();
+  }
+
+  TlsContext client_tls_;
+  TlsContext server_tls_;
+
+  string cert_path_;
+  string key_path_;
+};
+
+class TestTlsHandshake : public TestTlsHandshakeBase,
+                   public ::testing::WithParamInterface<Case> {};
+
+class TestTlsHandshakeConcurrent : public TestTlsHandshakeBase,
+                   public ::testing::WithParamInterface<int> {};
+
+// Test concurrently running handshakes while changing the certificates on the TLS
+// context. We parameterize across different numbers of threads, because surprisingly,
+// fewer threads seems to trigger issues more easily in some cases.
+INSTANTIATE_TEST_CASE_P(NumThreads, TestTlsHandshakeConcurrent, ::testing::Values(1, 2, 4, 8));
+TEST_P(TestTlsHandshakeConcurrent, TestConcurrentAdoptCert) {
+  const int kNumThreads = GetParam();
+
+  ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+  std::atomic<bool> done(false);
+  vector<std::thread> handshake_threads;
+  for (int i = 0; i < kNumThreads; i++) {
+    handshake_threads.emplace_back([&]() {
+        while (!done) {
+          RunHandshake(TlsVerificationMode::VERIFY_NONE, TlsVerificationMode::VERIFY_NONE);
+        }
+      });
+  }
+  auto c = MakeScopedCleanup([&](){
+      done = true;
+      for (std::thread& t : handshake_threads) {
+        t.join();
+      }
+    });
+
+  SleepFor(MonoDelta::FromMilliseconds(10));
+  {
+    PrivateKey ca_key;
+    Cert ca_cert;
+    ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+    Cert cert;
+    ASSERT_OK(CertSigner(&ca_cert, &ca_key).Sign(*server_tls_.GetCsrIfNecessary(), &cert));
+    ASSERT_OK(server_tls_.AddTrustedCertificate(ca_cert));
+    ASSERT_OK(server_tls_.AdoptSignedCert(cert));
+  }
+  SleepFor(MonoDelta::FromMilliseconds(10));
+}
+
+TEST_F(TestTlsHandshake, TestHandshakeSequence) {
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  // Both client and server have certs and CA.
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &client_tls_));
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &server_tls_));
+
+  TlsHandshake server;
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+  ASSERT_OK(server_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+
+  string buf1;
+  string buf2;
+
+  // Client sends Hello
+  ASSERT_TRUE(client.Continue(buf1, &buf2).IsIncomplete());
+  ASSERT_GT(buf2.size(), 0);
+
+  // Server receives client Hello, and sends server Hello
+  ASSERT_TRUE(server.Continue(buf2, &buf1).IsIncomplete());
+  ASSERT_GT(buf1.size(), 0);
+
+  // Client receives server Hello and sends client Finished
+  ASSERT_TRUE(client.Continue(buf1, &buf2).IsIncomplete());
+  ASSERT_GT(buf2.size(), 0);
+
+  // Server receives client Finished and sends server Finished
+  ASSERT_OK(server.Continue(buf2, &buf1));
+  ASSERT_GT(buf1.size(), 0);
+
+  // Client receives server Finished
+  ASSERT_OK(client.Continue(buf1, &buf2));
+  ASSERT_EQ(buf2.size(), 0);
+}
+
+// Tests that the TlsContext can transition from self signed cert to signed
+// cert, and that it rejects invalid certs along the way. We are testing this
+// here instead of in a dedicated TlsContext test because it requires completing
+// handshakes to fully validate.
+TEST_F(TestTlsHandshake, TestTlsContextCertTransition) {
+  ASSERT_FALSE(server_tls_.has_cert());
+  ASSERT_FALSE(server_tls_.has_signed_cert());
+  ASSERT_EQ(boost::none, server_tls_.GetCsrIfNecessary());
+
+  ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+  ASSERT_TRUE(server_tls_.has_cert());
+  ASSERT_FALSE(server_tls_.has_signed_cert());
+  ASSERT_NE(boost::none, server_tls_.GetCsrIfNecessary());
+  ASSERT_OK(RunHandshake(TlsVerificationMode::VERIFY_NONE, TlsVerificationMode::VERIFY_NONE));
+  ASSERT_STR_MATCHES(RunHandshake(TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+                                  TlsVerificationMode::VERIFY_NONE).ToString(),
+                     "client error:.*certificate verify failed");
+
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  Cert cert;
+  ASSERT_OK(CertSigner(&ca_cert, &ca_key).Sign(*server_tls_.GetCsrIfNecessary(), &cert));
+
+  // Try to adopt the cert without first trusting the CA.
+  ASSERT_STR_MATCHES(server_tls_.AdoptSignedCert(cert).ToString(),
+                     "could not verify certificate chain");
+
+  // Check that we can still do (unverified) handshakes.
+  ASSERT_TRUE(server_tls_.has_cert());
+  ASSERT_FALSE(server_tls_.has_signed_cert());
+  ASSERT_OK(RunHandshake(TlsVerificationMode::VERIFY_NONE, TlsVerificationMode::VERIFY_NONE));
+
+  // Trust the root cert.
+  ASSERT_OK(server_tls_.AddTrustedCertificate(ca_cert));
+
+  // Generate a bogus cert and attempt to adopt it.
+  Cert bogus_cert;
+  {
+    TlsContext bogus_tls;
+    ASSERT_OK(bogus_tls.Init());
+    ASSERT_OK(bogus_tls.GenerateSelfSignedCertAndKey());
+    ASSERT_OK(CertSigner(&ca_cert, &ca_key).Sign(*bogus_tls.GetCsrIfNecessary(), &bogus_cert));
+  }
+  ASSERT_STR_MATCHES(server_tls_.AdoptSignedCert(bogus_cert).ToString(),
+                     "certificate public key does not match the CSR public key");
+
+  // Check that we can still do (unverified) handshakes.
+  ASSERT_TRUE(server_tls_.has_cert());
+  ASSERT_FALSE(server_tls_.has_signed_cert());
+  ASSERT_OK(RunHandshake(TlsVerificationMode::VERIFY_NONE, TlsVerificationMode::VERIFY_NONE));
+
+  // Adopt the legitimate signed cert.
+  ASSERT_OK(server_tls_.AdoptSignedCert(cert));
+
+  // Check that we can do verified handshakes.
+  ASSERT_TRUE(server_tls_.has_cert());
+  ASSERT_TRUE(server_tls_.has_signed_cert());
+  ASSERT_OK(RunHandshake(TlsVerificationMode::VERIFY_NONE, TlsVerificationMode::VERIFY_NONE));
+  ASSERT_OK(client_tls_.AddTrustedCertificate(ca_cert));
+  ASSERT_OK(RunHandshake(TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+                         TlsVerificationMode::VERIFY_NONE));
+}
+
+TEST_P(TestTlsHandshake, TestHandshake) {
+  Case test_case = GetParam();
+
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  ASSERT_OK(ConfigureTlsContext(test_case.client_pki, ca_cert, ca_key, &client_tls_));
+  ASSERT_OK(ConfigureTlsContext(test_case.server_pki, ca_cert, ca_key, &server_tls_));
+
+  Status s = RunHandshake(test_case.client_verification, test_case.server_verification);
+
+  EXPECT_EQ(test_case.expected_status.CodeAsString(), s.CodeAsString());
+  ASSERT_STR_MATCHES(s.ToString(), test_case.expected_status.message().ToString());
+}
+
+INSTANTIATE_TEST_CASE_P(CertCombinations,
+                        TestTlsHandshake,
+                        ::testing::Values(
+
+        // We don't test any cases where the server has no cert or the client
+        // has a self-signed cert, since we don't expect those to occur in
+        // practice.
+
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+        Case { PkiConfig::NONE, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+        Case { PkiConfig::TRUSTED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("server error:.*peer did not return a certificate") },
+
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               // OpenSSL 1.0.0 returns "no certificate returned" for this case,
+               // which appears to be a bug.
+               Status::RuntimeError("server error:.*(certificate verify failed|"
+                                                    "no certificate returned)") },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SELF_SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::RuntimeError("client error:.*certificate verify failed") },
+
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               Status::OK() },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_NONE,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::OK() },
+        Case { PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               PkiConfig::SIGNED, TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST,
+               Status::OK() }
+));
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_handshake.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_handshake.cc b/be/src/kudu/security/tls_handshake.cc
new file mode 100644
index 0000000..5a89592
--- /dev/null
+++ b/be/src/kudu/security/tls_handshake.cc
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <memory>
+#include <string>
+
+#include <openssl/ssl.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/tls_socket.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/trace.h"
+
+#if OPENSSL_VERSION_NUMBER < 0x10002000L
+#include "kudu/security/x509_check_host.h"
+#endif // OPENSSL_VERSION_NUMBER
+
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace security {
+
+void TlsHandshake::SetSSLVerify() {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(ssl_);
+  CHECK(!has_started_);
+  int ssl_mode = 0;
+  switch (verification_mode_) {
+    case TlsVerificationMode::VERIFY_NONE:
+      ssl_mode = SSL_VERIFY_NONE;
+      break;
+    case TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST:
+      // Server mode: the server sends a client certificate request to the client. The
+      // certificate returned (if any) is checked. If the verification process fails, the TLS/SSL
+      // handshake is immediately terminated with an alert message containing the reason for the
+      // verification failure. The behaviour can be controlled by the additional
+      // SSL_VERIFY_FAIL_IF_NO_PEER_CERT and SSL_VERIFY_CLIENT_ONCE flags.
+
+      // Client mode: the server certificate is verified. If the verification process fails, the
+      // TLS/SSL handshake is immediately terminated with an alert message containing the reason
+      // for the verification failure. If no server certificate is sent, because an anonymous
+      // cipher is used, SSL_VERIFY_PEER is ignored.
+      ssl_mode |= SSL_VERIFY_PEER;
+
+      // Server mode: if the client did not return a certificate, the TLS/SSL handshake is
+      // immediately terminated with a "handshake failure" alert. This flag must be used
+      // together with SSL_VERIFY_PEER.
+      ssl_mode |= SSL_VERIFY_FAIL_IF_NO_PEER_CERT;
+      // Server mode: only request a client certificate on the initial TLS/SSL handshake. Do
+      // not ask for a client certificate again in case of a renegotiation. This flag must be
+      // used together with SSL_VERIFY_PEER.
+      ssl_mode |= SSL_VERIFY_CLIENT_ONCE;
+      break;
+  }
+
+  SSL_set_verify(ssl_.get(), ssl_mode, /* callback = */nullptr);
+}
+
+Status TlsHandshake::Continue(const string& recv, string* send) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  if (!has_started_) {
+    SetSSLVerify();
+    has_started_ = true;
+  }
+  CHECK(ssl_);
+
+  BIO* rbio = SSL_get_rbio(ssl_.get());
+  int n = BIO_write(rbio, recv.data(), recv.size());
+  DCHECK(n == recv.size() || (n == -1 && recv.empty()));
+  DCHECK_EQ(BIO_ctrl_pending(rbio), recv.size());
+
+  int rc = SSL_do_handshake(ssl_.get());
+  if (rc != 1) {
+    int ssl_err = SSL_get_error(ssl_.get(), rc);
+    // WANT_READ and WANT_WRITE indicate that the handshake is not yet complete.
+    if (ssl_err != SSL_ERROR_WANT_READ && ssl_err != SSL_ERROR_WANT_WRITE) {
+      return Status::RuntimeError("TLS Handshake error", GetSSLErrorDescription(ssl_err));
+    }
+    // In the case that we got SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE,
+    // the OpenSSL implementation guarantees that there is no error entered into
+    // the ERR error queue, so no need to ERR_clear_error() here.
+  }
+
+  BIO* wbio = SSL_get_wbio(ssl_.get());
+  int pending = BIO_ctrl_pending(wbio);
+
+  send->resize(pending);
+  BIO_read(wbio, &(*send)[0], send->size());
+  DCHECK_EQ(BIO_ctrl_pending(wbio), 0);
+
+  if (rc == 1) {
+    // The handshake is done, but in the case of the server, we still need to
+    // send the final response to the client.
+    DCHECK_GE(send->size(), 0);
+    return Status::OK();
+  }
+  return Status::Incomplete("TLS Handshake incomplete");
+}
+
+Status TlsHandshake::Verify(const Socket& socket) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  DCHECK(SSL_is_init_finished(ssl_.get()));
+  CHECK(ssl_);
+
+  if (verification_mode_ == TlsVerificationMode::VERIFY_NONE) {
+    return Status::OK();
+  }
+  DCHECK(verification_mode_ == TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST);
+
+  int rc = SSL_get_verify_result(ssl_.get());
+  if (rc != X509_V_OK) {
+    return Status::NotAuthorized(Substitute("SSL cert verification failed: $0",
+                                            X509_verify_cert_error_string(rc)),
+                                 GetOpenSSLErrors());
+  }
+
+  // Get the peer certificate.
+  X509* cert = remote_cert_.GetTopOfChainX509();
+  if (!cert) {
+    if (SSL_get_verify_mode(ssl_.get()) & SSL_VERIFY_FAIL_IF_NO_PEER_CERT) {
+      return Status::NotAuthorized("Handshake failed: unable to retreive peer certificate");
+    }
+    // No cert, but we weren't requiring one.
+    TRACE("Got no cert from peer, but not required");
+    return Status::OK();
+  }
+
+  // TODO(KUDU-1886): Do hostname verification.
+  /*
+  TRACE("Verifying peer cert");
+
+  // Get the peer's hostname
+  Sockaddr peer_addr;
+  if (!socket.GetPeerAddress(&peer_addr).ok()) {
+    return Status::NotAuthorized(
+        "TLS certificate hostname verification failed: unable to get peer address");
+  }
+  string peer_hostname;
+  RETURN_NOT_OK_PREPEND(peer_addr.LookupHostname(&peer_hostname),
+      "TLS certificate hostname verification failed: unable to lookup peer hostname");
+
+  // Check if the hostname matches with either the Common Name or any of the Subject Alternative
+  // Names of the certificate.
+  int match = X509_check_host(cert,
+                              peer_hostname.c_str(),
+                              peer_hostname.length(),
+                              0,
+                              nullptr);
+  if (match == 0) {
+    return Status::NotAuthorized("TLS certificate hostname verification failed");
+  }
+  if (match < 0) {
+    return Status::RuntimeError("TLS certificate hostname verification error", GetOpenSSLErrors());
+  }
+  DCHECK_EQ(match, 1);
+  */
+  return Status::OK();
+}
+
+Status TlsHandshake::GetCerts() {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  X509* cert = SSL_get_certificate(ssl_.get());
+  if (cert) {
+    // For whatever reason, SSL_get_certificate (unlike SSL_get_peer_certificate)
+    // does not increment the X509's reference count.
+    local_cert_.AdoptAndAddRefX509(cert);
+  }
+
+  cert = SSL_get_peer_certificate(ssl_.get());
+  if (cert) {
+    remote_cert_.AdoptX509(cert);
+  }
+  return Status::OK();
+}
+
+Status TlsHandshake::Finish(unique_ptr<Socket>* socket) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  RETURN_NOT_OK(GetCerts());
+  RETURN_NOT_OK(Verify(**socket));
+
+  int fd = (*socket)->Release();
+
+  // Give the socket to the SSL instance. This will automatically free the
+  // read and write memory BIO instances.
+  int ret = SSL_set_fd(ssl_.get(), fd);
+  if (ret != 1) {
+    return Status::RuntimeError("TLS handshake error", GetOpenSSLErrors());
+  }
+
+  // Transfer the SSL instance to the socket.
+  socket->reset(new TlsSocket(fd, std::move(ssl_)));
+
+  return Status::OK();
+}
+
+Status TlsHandshake::FinishNoWrap(const Socket& socket) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  RETURN_NOT_OK(GetCerts());
+  return Verify(socket);
+}
+
+Status TlsHandshake::GetLocalCert(Cert* cert) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  if (!local_cert_.GetRawData()) {
+    return Status::RuntimeError("no local certificate");
+  }
+  cert->AdoptAndAddRefRawData(local_cert_.GetRawData());
+  return Status::OK();
+}
+
+Status TlsHandshake::GetRemoteCert(Cert* cert) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  if (!remote_cert_.GetRawData()) {
+    return Status::RuntimeError("no remote certificate");
+  }
+  cert->AdoptAndAddRefRawData(remote_cert_.GetRawData());
+  return Status::OK();
+}
+
+string TlsHandshake::GetCipherSuite() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(has_started_);
+  return SSL_get_cipher_name(ssl_.get());
+}
+
+string TlsHandshake::GetProtocol() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(has_started_);
+  return SSL_get_version(ssl_.get());
+}
+
+string TlsHandshake::GetCipherDescription() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(has_started_);
+  const SSL_CIPHER* cipher = SSL_get_current_cipher(ssl_.get());
+  if (!cipher) {
+    return "NONE";
+  }
+  char buf[512];
+  const char* description = SSL_CIPHER_description(cipher, buf, sizeof(buf));
+  if (!description) {
+    return "NONE";
+  }
+  string ret(description);
+  StripTrailingNewline(&ret);
+  StripDupCharacters(&ret, ' ', 0);
+  return ret;
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_handshake.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_handshake.h b/be/src/kudu/security/tls_handshake.h
new file mode 100644
index 0000000..56020c4
--- /dev/null
+++ b/be/src/kudu/security/tls_handshake.h
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Socket;
+
+namespace security {
+
+enum class TlsHandshakeType {
+  // The local endpoint is the TLS client (initiator).
+  CLIENT,
+  // The local endpoint is the TLS server (acceptor).
+  SERVER,
+};
+
+// Mode for performing verification of the remote peer's identity during a handshake.
+enum class TlsVerificationMode {
+  // SERVER:
+  //    No certificate will be requested from the client, and no verification
+  //    will be done.
+  // CLIENT:
+  //    The server's certificate will be obtained but no verification will be done.
+  //    (the server still requires a certificate, even if it is self-signed).
+  VERIFY_NONE,
+
+  // BOTH:
+  // The remote peer is required to have a signed certificate. The certificate will
+  // be verified in two ways:
+  //  1) The certificate must be signed by a trusted CA (or chain of CAs).
+  //  2) Second, the hostname of the remote peer (as determined by reverse DNS of the
+  //    socket address) must match the common name or one of the Subject Alternative
+  //    Names stored in the certificate.
+  VERIFY_REMOTE_CERT_AND_HOST
+};
+
+// TlsHandshake manages an ongoing TLS handshake between a client and server.
+//
+// TlsHandshake instances are default constructed, but must be initialized
+// before use using TlsContext::InitiateHandshake.
+class TlsHandshake {
+ public:
+
+   TlsHandshake() = default;
+   ~TlsHandshake() = default;
+
+  // Set the verification mode for this handshake. The default verification mode
+  // is VERIFY_REMOTE_CERT_AND_HOST.
+  //
+  // This must be called before the first call to Continue().
+  void set_verification_mode(TlsVerificationMode mode) {
+    DCHECK(!has_started_);
+    verification_mode_ = mode;
+  }
+
+  // Continue or start a new handshake.
+  //
+  // 'recv' should contain the input buffer from the remote end, or an empty
+  // string when the handshake is new.
+  //
+  // 'send' should contain the output buffer which must be sent to the remote
+  // end.
+  //
+  // Returns Status::OK when the handshake is complete, however the 'send'
+  // buffer may contain a message which must still be transmitted to the remote
+  // end. If the send buffer is empty after this call and the return is
+  // Status::OK, the socket should immediately be wrapped in the TLS channel
+  // using 'Finish'. If the send buffer is not empty, the message should be sent
+  // to the remote end, and then the socket should be wrapped using 'Finish'.
+  //
+  // Returns Status::Incomplete when the handshake must continue for another
+  // round of messages.
+  //
+  // Returns any other status code on error.
+  Status Continue(const std::string& recv, std::string* send) WARN_UNUSED_RESULT;
+
+  // Finishes the handshake, wrapping the provided socket in the negotiated TLS
+  // channel. This 'TlsHandshake' instance should not be used again after
+  // calling this.
+  Status Finish(std::unique_ptr<Socket>* socket) WARN_UNUSED_RESULT;
+
+  // Finish the handshake, using the provided socket to verify the remote peer,
+  // but without wrapping the socket.
+  Status FinishNoWrap(const Socket& socket) WARN_UNUSED_RESULT;
+
+  // Retrieve the local certificate. This will return an error status if there
+  // is no local certificate.
+  //
+  // May only be called after 'Finish' or 'FinishNoWrap'.
+  Status GetLocalCert(Cert* cert) const WARN_UNUSED_RESULT;
+
+  // Retrieve the remote peer's certificate. This will return an error status if
+  // there is no remote certificate.
+  //
+  // May only be called after 'Finish' or 'FinishNoWrap'.
+  Status GetRemoteCert(Cert* cert) const WARN_UNUSED_RESULT;
+
+  // Retrieve the negotiated cipher suite. Only valid to call after the
+  // handshake is complete and before 'Finish()'.
+  std::string GetCipherSuite() const;
+
+  // Retrieve the negotiated TLS protocol version. Only valid to call after the
+  // handshake is complete and before 'Finish()'.
+  std::string GetProtocol() const;
+
+  // Retrive the description of the negotiated cipher.
+  // Only valid to call after the handshake is complete and before 'Finish()'.
+  std::string GetCipherDescription() const;
+
+ private:
+  friend class TlsContext;
+
+  bool has_started_ = false;
+  TlsVerificationMode verification_mode_ = TlsVerificationMode::VERIFY_REMOTE_CERT_AND_HOST;
+
+  // Set the verification mode on the underlying SSL object.
+  void SetSSLVerify();
+
+  // Set the SSL to use during the handshake. Called once by
+  // TlsContext::InitiateHandshake before starting the handshake processes.
+  void adopt_ssl(c_unique_ptr<SSL> ssl) {
+    CHECK(!ssl_);
+    ssl_ = std::move(ssl);
+  }
+
+  SSL* ssl() {
+    return ssl_.get();
+  }
+
+  // Populates local_cert_ and remote_cert_.
+  Status GetCerts() WARN_UNUSED_RESULT;
+
+  // Verifies that the handshake is valid for the provided socket.
+  Status Verify(const Socket& socket) const WARN_UNUSED_RESULT;
+
+  // Owned SSL handle.
+  c_unique_ptr<SSL> ssl_;
+
+  Cert local_cert_;
+  Cert remote_cert_;
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_socket-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket-test.cc b/be/src/kudu/security/tls_socket-test.cc
new file mode 100644
index 0000000..001a206
--- /dev/null
+++ b/be/src/kudu/security/tls_socket-test.cc
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_handshake.h"
+
+#include <algorithm>
+#include <pthread.h>
+#include <sched.h>
+#include <sys/uio.h>
+
+#include <atomic>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+
+// Size is big enough to not fit into output socket buffer of default size
+// (controlled by setsockopt() with SO_SNDBUF).
+constexpr size_t kEchoChunkSize = 32 * 1024 * 1024;
+
+class TlsSocketTest : public KuduTest {
+ public:
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(client_tls_.Init());
+  }
+
+ protected:
+  void ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock);
+  TlsContext client_tls_;
+};
+
+Status DoNegotiationSide(Socket* sock, TlsHandshake* tls, const char* side) {
+  tls->set_verification_mode(TlsVerificationMode::VERIFY_NONE);
+
+  bool done = false;
+  string received;
+  while (!done) {
+    string to_send;
+    Status s = tls->Continue(received, &to_send);
+    if (s.ok()) {
+      done = true;
+    } else if (!s.IsIncomplete()) {
+      RETURN_NOT_OK_PREPEND(s, "unexpected tls error");
+    }
+    if (!to_send.empty()) {
+      size_t nwritten;
+      auto deadline = MonoTime::Now() + MonoDelta::FromSeconds(10);
+      RETURN_NOT_OK_PREPEND(sock->BlockingWrite(
+          reinterpret_cast<const uint8_t*>(to_send.data()),
+          to_send.size(), &nwritten, deadline),
+                            "error sending");
+    }
+
+    if (!done) {
+      uint8_t buf[1024];
+      int32_t n = 0;
+      RETURN_NOT_OK_PREPEND(sock->Recv(buf, arraysize(buf), &n),
+                            "error receiving");
+      received = string(reinterpret_cast<char*>(&buf[0]), n);
+    }
+  }
+  LOG(INFO) << side << ": negotiation complete";
+  return Status::OK();
+}
+
+void TlsSocketTest::ConnectClient(const Sockaddr& addr, unique_ptr<Socket>* sock) {
+  unique_ptr<Socket> client_sock(new Socket());
+  ASSERT_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(addr));
+
+  TlsHandshake client;
+  ASSERT_OK(client_tls_.InitiateHandshake(TlsHandshakeType::CLIENT, &client));
+  ASSERT_OK(DoNegotiationSide(client_sock.get(), &client, "client"));
+  ASSERT_OK(client.Finish(&client_sock));
+  *sock = std::move(client_sock);
+}
+
+class EchoServer {
+ public:
+  EchoServer()
+      : pthread_sync_(1) {
+  }
+  ~EchoServer() {
+    Stop();
+    Join();
+  }
+
+  void Start() {
+    ASSERT_OK(server_tls_.Init());
+    ASSERT_OK(server_tls_.GenerateSelfSignedCertAndKey());
+    ASSERT_OK(listen_addr_.ParseString("127.0.0.1", 0));
+    ASSERT_OK(listener_.Init(0));
+    ASSERT_OK(listener_.BindAndListen(listen_addr_, /*listen_queue_size=*/10));
+    ASSERT_OK(listener_.GetSocketAddress(&listen_addr_));
+
+    thread_ = thread([&] {
+        pthread_ = pthread_self();
+        pthread_sync_.CountDown();
+        unique_ptr<Socket> sock(new Socket());
+        Sockaddr remote;
+        CHECK_OK(listener_.Accept(sock.get(), &remote, /*flags=*/0));
+
+        TlsHandshake server;
+        CHECK_OK(server_tls_.InitiateHandshake(TlsHandshakeType::SERVER, &server));
+        CHECK_OK(DoNegotiationSide(sock.get(), &server, "server"));
+        CHECK_OK(server.Finish(&sock));
+
+        CHECK_OK(sock->SetRecvTimeout(kTimeout));
+        unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+        // An "echo" loop for kEchoChunkSize byte buffers.
+        while (!stop_) {
+          size_t n;
+          Status s = sock->BlockingRecv(buf.get(), kEchoChunkSize, &n, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error reading: " << s.ToString();
+          }
+
+          LOG(INFO) << "server echoing " << n << " bytes";
+          size_t written;
+          s = sock->BlockingWrite(buf.get(), n, &written, MonoTime::Now() + kTimeout);
+          if (!s.ok()) {
+            CHECK(stop_) << "unexpected error writing: " << s.ToString();
+          }
+          if (slow_read_) {
+            SleepFor(MonoDelta::FromMilliseconds(10));
+          }
+        }
+
+        CHECK_OK(listener_.Close());
+      });
+  }
+
+  void EnableSlowRead() {
+    slow_read_ = true;
+  }
+
+  const Sockaddr& listen_addr() const {
+    return listen_addr_;
+  }
+
+  bool stopped() const {
+    return stop_;
+  }
+
+  void Stop() {
+    stop_ = true;
+  }
+  void Join() {
+    thread_.join();
+  }
+
+  const pthread_t& pthread() {
+    pthread_sync_.Wait();
+    return pthread_;
+  }
+
+ private:
+  TlsContext server_tls_;
+  Socket listener_;
+  Sockaddr listen_addr_;
+  thread thread_;
+  pthread_t pthread_;
+  CountDownLatch pthread_sync_;
+  std::atomic<bool> stop_ { false };
+
+  bool slow_read_ = false;
+};
+
+void handler(int /* signal */) {}
+
+TEST_F(TlsSocketTest, TestRecvFailure) {
+    EchoServer server;
+    server.Start();
+    unique_ptr<Socket> client_sock;
+    NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+    unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+
+    SleepFor(MonoDelta::FromMilliseconds(100));
+    server.Stop();
+
+    size_t nwritten;
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
+        MonoTime::Now() + kTimeout));
+    size_t nread;
+
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
+        MonoTime::Now() + kTimeout));
+
+    Status s = client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &nread,
+        MonoTime::Now() + kTimeout);
+
+    ASSERT_TRUE(!s.ok());
+    ASSERT_TRUE(s.IsNetworkError());
+    ASSERT_STR_MATCHES(s.message().ToString(), "BlockingRecv error: failed to read from "
+                                               "TLS socket \\(remote: 127.0.0.1:[0-9]+\\): ");
+}
+
+// Test for failures to handle EINTR during TLS connection
+// negotiation and data send/receive.
+TEST_F(TlsSocketTest, TestTlsSocketInterrupted) {
+  // Set up a no-op signal handler for SIGUSR2.
+  struct sigaction sa, sa_old;
+  memset(&sa, 0, sizeof(sa));
+  sa.sa_handler = &handler;
+  sigaction(SIGUSR2, &sa, &sa_old);
+  SCOPED_CLEANUP({ sigaction(SIGUSR2, &sa_old, nullptr); });
+
+  EchoServer server;
+  NO_FATALS(server.Start());
+
+  // Start a thread to send signals to the server thread.
+  thread killer([&]() {
+      while (!server.stopped()) {
+        PCHECK(pthread_kill(server.pthread(), SIGUSR2) == 0);
+        SleepFor(MonoDelta::FromMicroseconds(rand() % 10));
+      }
+    });
+  SCOPED_CLEANUP({ killer.join(); });
+
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+  for (int i = 0; i < 10; i++) {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    size_t nwritten;
+    ASSERT_OK(client_sock->BlockingWrite(buf.get(), kEchoChunkSize, &nwritten,
+        MonoTime::Now() + kTimeout));
+    size_t n;
+    ASSERT_OK(client_sock->BlockingRecv(buf.get(), kEchoChunkSize, &n,
+        MonoTime::Now() + kTimeout));
+  }
+  server.Stop();
+  ASSERT_OK(client_sock->Close());
+  LOG(INFO) << "client done";
+}
+
+// Return an iovec containing the same data as the buffer 'buf' with the length 'len',
+// but split into random-sized chunks. The chunks are sized randomly between 1 and
+// 'max_chunk_size' bytes.
+vector<struct iovec> ChunkIOVec(Random* rng, uint8_t* buf, int len, int max_chunk_size) {
+  vector<struct iovec> ret;
+  uint8_t* p = buf;
+  int rem = len;
+  while (rem > 0) {
+    int len = rng->Uniform(max_chunk_size) + 1;
+    len = std::min(len, rem);
+    ret.push_back({p, static_cast<size_t>(len)});
+    p += len;
+    rem -= len;
+  }
+  return ret;
+}
+
+// Regression test for KUDU-2218, a bug in which Writev would improperly handle
+// partial writes in non-blocking mode.
+TEST_F(TlsSocketTest, TestNonBlockingWritev) {
+  Random rng(GetRandomSeed32());
+
+  EchoServer server;
+  server.EnableSlowRead();
+  NO_FATALS(server.Start());
+
+  unique_ptr<Socket> client_sock;
+  NO_FATALS(ConnectClient(server.listen_addr(), &client_sock));
+
+  unique_ptr<uint8_t[]> buf(new uint8_t[kEchoChunkSize]);
+  unique_ptr<uint8_t[]> rbuf(new uint8_t[kEchoChunkSize]);
+  RandomString(buf.get(), kEchoChunkSize, &rng);
+
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(client_sock->SetNonBlocking(true));
+
+    // Prepare an IOV with the input data split into a bunch of randomly-sized
+    // chunks.
+    vector<struct iovec> iov = ChunkIOVec(&rng, buf.get(), kEchoChunkSize, 1024 * 1024);
+
+    // Loop calling writev until the iov is exhausted
+    int rem = kEchoChunkSize;
+    while (rem > 0) {
+      CHECK(!iov.empty()) << rem;
+      int64_t n;
+      Status s = client_sock->Writev(&iov[0], iov.size(), &n);
+      if (Socket::IsTemporarySocketError(s.posix_code())) {
+        sched_yield();
+        continue;
+      }
+      ASSERT_OK(s);
+      ASSERT_LE(n, rem);
+      rem -= n;
+      ASSERT_GE(n, 0);
+      while (n > 0) {
+        if (n < iov[0].iov_len) {
+          iov[0].iov_len -= n;
+          iov[0].iov_base = reinterpret_cast<uint8_t*>(iov[0].iov_base) + n;
+          n = 0;
+        } else {
+          n -= iov[0].iov_len;
+          iov.erase(iov.begin());
+        }
+      }
+    }
+    LOG(INFO) << "client waiting";
+
+    size_t n;
+    ASSERT_OK(client_sock->SetNonBlocking(false));
+    ASSERT_OK(client_sock->BlockingRecv(rbuf.get(), kEchoChunkSize, &n,
+        MonoTime::Now() + kTimeout));
+    LOG(INFO) << "client got response";
+
+    ASSERT_EQ(0, memcmp(buf.get(), rbuf.get(), kEchoChunkSize));
+  }
+
+  server.Stop();
+  ASSERT_OK(client_sock->Close());
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_socket.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.cc b/be/src/kudu/security/tls_socket.cc
new file mode 100644
index 0000000..355f04b
--- /dev/null
+++ b/be/src/kudu/security/tls_socket.cc
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/tls_socket.h"
+
+#include <sys/uio.h>
+
+#include <cerrno>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+#include <openssl/err.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+
+namespace kudu {
+namespace security {
+
+TlsSocket::TlsSocket(int fd, c_unique_ptr<SSL> ssl)
+    : Socket(fd),
+      ssl_(std::move(ssl)) {
+}
+
+TlsSocket::~TlsSocket() {
+  ignore_result(Close());
+}
+
+Status TlsSocket::Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) {
+  CHECK(ssl_);
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+
+  *nwritten = 0;
+  if (PREDICT_FALSE(amt == 0)) {
+    // Writing an empty buffer is a no-op. This happens occasionally, eg in the
+    // case where the response has an empty sidecar. We have to special case
+    // it, because SSL_write can return '0' to indicate certain types of errors.
+    return Status::OK();
+  }
+
+  errno = 0;
+  int32_t bytes_written = SSL_write(ssl_.get(), buf, amt);
+  int save_errno = errno;
+  if (bytes_written <= 0) {
+    auto error_code = SSL_get_error(ssl_.get(), bytes_written);
+    if (error_code == SSL_ERROR_WANT_WRITE) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_write error",
+                                    ErrnoToString(save_errno), save_errno);
+      }
+      // Socket not ready to write yet.
+      return Status::OK();
+    }
+    return Status::NetworkError("failed to write to TLS socket",
+                                GetSSLErrorDescription(error_code));
+  }
+  *nwritten = bytes_written;
+  return Status::OK();
+}
+
+Status TlsSocket::Writev(const struct ::iovec *iov, int iov_len, int64_t *nwritten) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(ssl_);
+  *nwritten = 0;
+  // Allows packets to be aggresively be accumulated before sending.
+  RETURN_NOT_OK(SetTcpCork(1));
+  Status write_status = Status::OK();
+  for (int i = 0; i < iov_len; ++i) {
+    int32_t frame_size = iov[i].iov_len;
+    int32_t bytes_written;
+    // Don't return before unsetting TCP_CORK.
+    write_status = Write(static_cast<uint8_t*>(iov[i].iov_base), frame_size, &bytes_written);
+    if (!write_status.ok()) break;
+
+    // nwritten should have the correct amount written.
+    *nwritten += bytes_written;
+    if (bytes_written < frame_size) break;
+  }
+  RETURN_NOT_OK(SetTcpCork(0));
+  // If we did manage to write something, but not everything, due to a temporary socket
+  // error, then we should still return an OK status indicating a successful _partial_
+  // write.
+  if (*nwritten > 0 && Socket::IsTemporarySocketError(write_status.posix_code())) {
+    return Status::OK();
+  }
+  return write_status;
+}
+
+Status TlsSocket::Recv(uint8_t *buf, int32_t amt, int32_t *nread) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+
+  CHECK(ssl_);
+  errno = 0;
+  int32_t bytes_read = SSL_read(ssl_.get(), buf, amt);
+  int save_errno = errno;
+  if (bytes_read <= 0) {
+    Sockaddr remote;
+    Socket::GetPeerAddress(&remote);
+    std::string kErrString = strings::Substitute("failed to read from TLS socket (remote: $0)",
+                                                 remote.ToString());
+
+    if (bytes_read == 0 && SSL_get_shutdown(ssl_.get()) == SSL_RECEIVED_SHUTDOWN) {
+      return Status::NetworkError(kErrString, ErrnoToString(ESHUTDOWN), ESHUTDOWN);
+    }
+    auto error_code = SSL_get_error(ssl_.get(), bytes_read);
+    if (error_code == SSL_ERROR_WANT_READ) {
+      if (save_errno != 0) {
+        return Status::NetworkError("SSL_read error from " + remote.ToString(),
+                                    ErrnoToString(save_errno), save_errno);
+      }
+      // Nothing available to read yet.
+      *nread = 0;
+      return Status::OK();
+    }
+    if (error_code == SSL_ERROR_SYSCALL && ERR_peek_error() == 0) {
+      // From the OpenSSL docs:
+      //   Some I/O error occurred.  The OpenSSL error queue may contain more
+      //   information on the error.  If the error queue is empty (i.e.
+      //   ERR_get_error() returns 0), ret can be used to find out more about
+      //   the error: If ret == 0, an EOF was observed that violates the pro-
+      //   tocol.  If ret == -1, the underlying BIO reported an I/O error (for
+      //   socket I/O on Unix systems, consult errno for details).
+      if (bytes_read == 0) {
+        // "EOF was observed that violates the protocol" (eg the other end disconnected)
+        return Status::NetworkError(kErrString, ErrnoToString(ECONNRESET), ECONNRESET);
+      }
+      if (bytes_read == -1 && save_errno != 0) {
+        return Status::NetworkError(kErrString, ErrnoToString(save_errno), save_errno);
+      }
+      return Status::NetworkError(kErrString, "unknown ERROR_SYSCALL");
+    }
+    return Status::NetworkError(kErrString, GetSSLErrorDescription(error_code));
+  }
+  *nread = bytes_read;
+  return Status::OK();
+}
+
+Status TlsSocket::Close() {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  errno = 0;
+
+  if (!ssl_) {
+    // Socket is already closed.
+    return Status::OK();
+  }
+
+  // Start the TLS shutdown processes. We don't care about waiting for the
+  // response, since the underlying socket will not be reused.
+  int32_t ret = SSL_shutdown(ssl_.get());
+  Status ssl_shutdown;
+  if (ret >= 0) {
+    ssl_shutdown = Status::OK();
+  } else {
+    auto error_code = SSL_get_error(ssl_.get(), ret);
+    ssl_shutdown = Status::NetworkError("TlsSocket::Close", GetSSLErrorDescription(error_code));
+  }
+
+  ssl_.reset();
+
+  // Close the underlying socket.
+  RETURN_NOT_OK(Socket::Close());
+  return ssl_shutdown;
+}
+
+} // namespace security
+} // namespace kudu