You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/07 14:52:10 UTC

[kudu] 02/03: KUDU-2871 support TLSv1.3 in Kudu RPC (C++ part)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit df6590d26de51d67e178c59b50dfcda6ea1244a7
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Feb 22 21:08:37 2021 -0800

    KUDU-2871 support TLSv1.3 in Kudu RPC (C++ part)
    
    With this patch Kudu servers and Kudu C++ client can use TLSv1.3
    for securing Kudu RPC communication.  All tests run between C++
    components now use TLSv1.3 when compiled with OpenSSL 1.1.1 or newer.
    
    This patch introduces two new server-side flags:
      --rpc_tls_ciphersuites
      --rpc_tls_excluded_protocols
    
    The former is to customize the list of preferred ciphers for TLSv1.3.
    The latter is to control the set of TLS protocols used by Kudu servers
    when securing connections for RPC communication.  It can be used along
    with the --rpc_tls_min_protocol flag to define the set of TLS protocols
    available to Kudu servers when running a TLS handshake.  For example,
    set --rpc_tls_excluded_protocols=TLSv1.3 to exclude TLSv1.3 from
    available options for securing Kudu RPC.
    
    In addition, I added a validator for the --rpc_tls_min_protocol flag,
    similar to the validator for --rpc_tls_excluded_protocols.
    
    This patch also adds a new test to exercise TLSv1.3 handshake in
    the context of establishing a connection for Kudu RPC.
    
    The embedded webserver isn't updated yet with TLSv1.3 cipher controls.
    That will be done in a separate patch because it requires pushing an
    update to the squeasel webserver to introduce a new TLSv1.3-specific
    flag and make a call to SSL_CTX_set_ciphersuites() correspondingly.
    
    Change-Id: Ia92a4d102c3c8cff76101e71ff71d24a9d78b672
    Reviewed-on: http://gerrit.cloudera.org:8080/17189
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 src/kudu/rpc/client_negotiation.cc      |   3 +-
 src/kudu/rpc/messenger.cc               |  30 ++++---
 src/kudu/rpc/messenger.h                |  42 +++++++--
 src/kudu/rpc/server_negotiation.cc      |  26 ++++--
 src/kudu/security/security-test-util.cc |  19 ++--
 src/kudu/security/security_flags.cc     |  24 ++++--
 src/kudu/security/security_flags.h      |   6 +-
 src/kudu/security/tls_context.cc        | 109 ++++++++++++++++++-----
 src/kudu/security/tls_context.h         |  35 ++++++--
 src/kudu/security/tls_handshake-test.cc | 148 ++++++++++++++++++++++++++++++--
 src/kudu/security/tls_handshake.cc      |  92 ++++++++++++++++++--
 src/kudu/security/tls_handshake.h       |  23 ++++-
 src/kudu/server/server_base.cc          |  71 +++++++++++++--
 13 files changed, 529 insertions(+), 99 deletions(-)

diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index ef9b368..1b50fac 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -496,8 +496,7 @@ Status ClientNegotiation::HandleTlsHandshake(const NegotiatePB& response) {
 
   string token;
   Status s = tls_handshake_.Continue(response.tls_handshake(), &token);
-  if (s.IsIncomplete()) {
-    // Another roundtrip is required to complete the handshake.
+  if (tls_handshake_.NeedsExtraStep(s, token)) {
     RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
   }
 
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index aea9ab8..87a7bae 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -77,6 +77,7 @@ MessengerBuilder::MessengerBuilder(string name)
       rpc_authentication_("optional"),
       rpc_encryption_("optional"),
       rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      rpc_tls_ciphersuites_(kudu::security::SecurityDefaults::kDefaultTlsCipherSuites),
       rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
       enable_inbound_tls_(false),
       reuseport_(false) {
@@ -307,19 +308,22 @@ void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote
 }
 
 Messenger::Messenger(const MessengerBuilder &bld)
-  : name_(bld.name_),
-    state_(kStarted),
-    authentication_(RpcAuthentication::REQUIRED),
-    encryption_(RpcEncryption::REQUIRED),
-    tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, bld.rpc_tls_min_protocol_)),
-    token_verifier_(new security::TokenVerifier()),
-    rpcz_store_(new RpczStore()),
-    metric_entity_(bld.metric_entity_),
-    rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
-    sasl_proto_name_(bld.sasl_proto_name_),
-    keytab_file_(bld.keytab_file_),
-    reuseport_(bld.reuseport_),
-    retain_self_(this) {
+    : name_(bld.name_),
+      state_(kStarted),
+      authentication_(RpcAuthentication::REQUIRED),
+      encryption_(RpcEncryption::REQUIRED),
+      tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_,
+                                            bld.rpc_tls_ciphersuites_,
+                                            bld.rpc_tls_min_protocol_,
+                                            bld.rpc_tls_excluded_protocols_)),
+      token_verifier_(new security::TokenVerifier),
+      rpcz_store_(new RpczStore),
+      metric_entity_(bld.metric_entity_),
+      rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
+      sasl_proto_name_(bld.sasl_proto_name_),
+      keytab_file_(bld.keytab_file_),
+      reuseport_(bld.reuseport_),
+      retain_self_(this) {
   for (int i = 0; i < bld.num_reactors_; i++) {
     reactors_.push_back(new Reactor(retain_self_, i, bld));
   }
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index b1e22de..983d959 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -22,6 +22,7 @@
 #include <mutex>
 #include <string>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -159,21 +160,48 @@ class MessengerBuilder {
     return *this;
   }
 
-  // Set the cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
-  // cipher preference list format. See man (1) ciphers for more information.
+  // Set TLSv1.2 and earlier cipher suite preferences to use for TLS-secured RPC
+  // connections. Uses the OpenSSL cipher preference list format. Under the
+  // hood, SSL_CTX_set_cipher_list() is eventually being called with
+  // 'rpc_tls_ciphers'. See 'man (1) ciphers' for more information on the syntax
+  // of the cipher suite preference list and
+  // https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_ciphersuites.html
+  // for SSL_CTX_set_cipher_list() API details.
   MessengerBuilder& set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers) {
     rpc_tls_ciphers_ = rpc_tls_ciphers;
     return *this;
   }
 
-  // Set the minimum protocol version to allow when for securing RPC connections with TLS. May be
-  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
-  MessengerBuilder& set_rpc_tls_min_protocol(
+  // Set TLSv1.3-specific cipher suite preferences to use for TLS-secured RPC
+  // connections. Uses the OpenSSL ciphersuite preference list format for
+  // TLSv1.3. Under the hood, SSL_CTX_set_ciphersuites() is eventually being
+  // called with 'rpc_tls_ciphersuites'. See 'man (1) ciphers' for more
+  // information on the TLSv1.3-specific syntax for the cipher suite preference
+  // list and
+  // https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_ciphersuites.html
+  // for SSL_CTX_set_ciphersuites() API details.
+  MessengerBuilder &set_rpc_tls_ciphersuites(
+      const std::string& rpc_tls_ciphersuites) {
+    rpc_tls_ciphersuites_ = rpc_tls_ciphersuites;
+    return *this;
+  }
+
+  // Set the minimum protocol version to allow when for securing RPC connections
+  // with TLS. May be one of 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'.
+  MessengerBuilder &set_rpc_tls_min_protocol(
       const std::string& rpc_tls_min_protocol) {
     rpc_tls_min_protocol_ = rpc_tls_min_protocol;
     return *this;
   }
 
+  // Set the list of TLS protocols to avoid when securing RPC connections. The
+  // elements might be from the list of 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'.
+  MessengerBuilder& set_rpc_tls_excluded_protocols(
+      std::vector<std::string> rpc_tls_excluded_protocols) {
+    rpc_tls_excluded_protocols_ = std::move(rpc_tls_excluded_protocols);
+    return *this;
+  }
+
   // Set the TLS server certificate and private key files paths. If this is set in conjunction
   // with enable_inbound_tls(), internal PKI will not be used for encrypted communication and
   // external PKI will be used instead.
@@ -233,8 +261,10 @@ class MessengerBuilder {
   std::string sasl_proto_name_;
   std::string rpc_authentication_;
   std::string rpc_encryption_;
-  std::string rpc_tls_ciphers_;
+  std::string rpc_tls_ciphers_;       // pre-TLSv1.3 cipher suites
+  std::string rpc_tls_ciphersuites_;  // TLSv1.3-related cipher suites
   std::string rpc_tls_min_protocol_;
+  std::vector<std::string> rpc_tls_excluded_protocols_;
   std::string rpc_certificate_file_;
   std::string rpc_private_key_file_;
   std::string rpc_ca_certificate_file_;
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 61ba0ed..5cf8c42 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -236,8 +236,12 @@ Status ServerNegotiation::Negotiate() {
       NegotiatePB request;
       RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
       Status s = HandleTlsHandshake(request);
-      if (s.ok()) break;
-      if (!s.IsIncomplete()) return s;
+      if (s.ok()) {
+        break;
+      }
+      if (!s.IsIncomplete()) {
+        return s;
+      }
     }
     tls_negotiated_ = true;
   }
@@ -593,18 +597,26 @@ Status ServerNegotiation::HandleTlsHandshake(const NegotiatePB& request) {
   }
 
   string token;
-  Status s = tls_handshake_.Continue(request.tls_handshake(), &token);
-
+  const Status s = tls_handshake_.Continue(request.tls_handshake(), &token);
   if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
     RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
     return s;
   }
+  const bool needs_extra_step = tls_handshake_.NeedsExtraStep(s, token);
+  if (needs_extra_step) {
+    RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+  }
 
-  // Regardless of whether this is the final handshake roundtrip (in which case
-  // Continue would have returned OK), we still need to return a response.
-  RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+  // Check that the handshake step didn't produce an error. It also propagates
+  // any non-OK status.
   RETURN_NOT_OK(s);
 
+  if (!needs_extra_step && !token.empty()) {
+    DCHECK(s.ok());
+    DCHECK(!token.empty());
+    tls_handshake_.StorePendingData(std::move(token));
+  }
+
   // TLS handshake is finished.
   if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) &&
       ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) {
diff --git a/src/kudu/security/security-test-util.cc b/src/kudu/security/security-test-util.cc
index 40b0938..b6e6340 100644
--- a/src/kudu/security/security-test-util.cc
+++ b/src/kudu/security/security-test-util.cc
@@ -29,14 +29,14 @@
 #include "kudu/security/tls_context.h"
 #include "kudu/util/test_util.h"
 
+using kudu::security::ca::CaCertRequestGenerator;
+using kudu::security::ca::CertSigner;
+
 namespace kudu {
 namespace security {
 
-using ca::CaCertRequestGenerator;
-using ca::CertSigner;
-
 Status GenerateSelfSignedCAForTests(PrivateKey* ca_key, Cert* ca_cert) {
-  static const int64_t kRootCaCertExpirationSeconds = 24 * 60 * 60;
+  constexpr int64_t kRootCaCertExpirationSeconds = 24 * 60 * 60;
   // Create a key for the self-signed CA.
   //
   // OpenSSL has a concept of "security levels" which, amongst other things,
@@ -48,13 +48,10 @@ Status GenerateSelfSignedCAForTests(PrivateKey* ca_key, Cert* ca_cert) {
   // See https://www.openssl.org/docs/man1.1.0/ssl/SSL_CTX_get_security_level.html
   // for more details.
   RETURN_NOT_OK(GeneratePrivateKey(1024, ca_key));
-
-  CaCertRequestGenerator::Config config = { "test-ca-cn" };
-  RETURN_NOT_OK(CertSigner::SelfSignCA(*ca_key,
-                                       config,
-                                       kRootCaCertExpirationSeconds,
-                                       ca_cert));
-  return Status::OK();
+  return CertSigner::SelfSignCA(*ca_key,
+                                CaCertRequestGenerator::Config{ "test-ca-cn" },
+                                kRootCaCertExpirationSeconds,
+                                ca_cert);
 }
 
 std::ostream& operator<<(std::ostream& o, PkiConfig c) {
diff --git a/src/kudu/security/security_flags.cc b/src/kudu/security/security_flags.cc
index acdd662..bfd09fb 100644
--- a/src/kudu/security/security_flags.cc
+++ b/src/kudu/security/security_flags.cc
@@ -26,15 +26,23 @@ namespace security {
 // list. These additional ciphers maintain compatibility with RHEL 6.5 and
 // below. The DH AES ciphers are not included since we are not configured to
 // use DH key agreement.
+// TODO(aserbin): refresh the list to drop RHEL6/CentOS6 ciphers and
+//                sync it with https://wiki.mozilla.org/Security/Server_Side_TLS
 const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsCiphers =
-                                   "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
-                                   "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
-                                   "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
-                                   "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
-                                   "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
-                                   "AES256-GCM-SHA384:AES128-GCM-SHA256:"
-                                   "AES256-SHA256:AES128-SHA256:"
-                                   "AES256-SHA:AES128-SHA";
+    "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
+    "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
+    "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
+    "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
+    "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
+    "AES256-GCM-SHA384:AES128-GCM-SHA256:"
+    "AES256-SHA256:AES128-SHA256:"
+    "AES256-SHA:AES128-SHA";
+
+// This is the "modern compatibility" TLSv1.3 cipher list of the Mozilla
+// Security Server Side TLS recommendations, accessed March 2021.
+// https://wiki.mozilla.org/Security/Server_Side_TLS
+const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsCipherSuites =
+    "TLS_AES_128_GCM_SHA256:TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256";
 
 const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsMinVersion = "TLSv1";
 
diff --git a/src/kudu/security/security_flags.h b/src/kudu/security/security_flags.h
index e64536d..06e6790 100644
--- a/src/kudu/security/security_flags.h
+++ b/src/kudu/security/security_flags.h
@@ -28,7 +28,11 @@ typedef TriStateFlag RpcAuthentication;
 typedef TriStateFlag RpcEncryption;
 
 struct SecurityDefaults {
-  static const char* const kDefaultTlsCiphers;
+  // The names for the 'kDefaultTlsCiphers' and 'kDefaultTlsCipherSuites'
+  // constants are confusingly close, but likely 'kDefaultTlsCiphers' is likely
+  // to be removed when obsoleting TLSv1.2 at some point in the future.
+  static const char* const kDefaultTlsCiphers;      // pre-TLSv1.3 ciphers
+  static const char* const kDefaultTlsCipherSuites; // TLSv1.3 and later ciphers
   static const char* const kDefaultTlsMinVersion;
 };
 
diff --git a/src/kudu/security/tls_context.cc b/src/kudu/security/tls_context.cc
index 87e58e6..23826fa 100644
--- a/src/kudu/security/tls_context.cc
+++ b/src/kudu/security/tls_context.cc
@@ -68,6 +68,9 @@
 #ifndef SSL_OP_NO_TLSv1_1
 #define SSL_OP_NO_TLSv1_1 0x10000000U
 #endif
+#ifndef SSL_OP_NO_TLSv1_2
+#define SSL_OP_NO_TLSv1_2 0x08000000U
+#endif
 #ifndef SSL_OP_NO_TLSv1_3
 #define SSL_OP_NO_TLSv1_3 0x20000000U
 #endif
@@ -77,12 +80,15 @@
 #ifndef TLS1_2_VERSION
 #define TLS1_2_VERSION 0x0303
 #endif
+#ifndef TLS1_3_VERSION
+#define TLS1_3_VERSION 0x0304
+#endif
 
 using kudu::security::ca::CertRequestGenerator;
-using strings::Substitute;
 using std::string;
 using std::unique_lock;
 using std::vector;
+using strings::Substitute;
 
 DEFINE_int32(ipki_server_key_size, 2048,
              "the number of bits for server cert's private key. The server cert "
@@ -107,10 +113,15 @@ template<> struct SslTypeTraits<X509_STORE_CTX> {
 
 namespace {
 
+constexpr const char* const kTLSv1 = "TLSv1";
+constexpr const char* const kTLSv1_1 = "TLSv1.1";
+constexpr const char* const kTLSv1_2 = "TLSv1.2";
+constexpr const char* const kTLSv1_3 = "TLSv1.3";
+
 Status CheckMaxSupportedTlsVersion(int tls_version, const char* tls_version_str) {
-  // OpenSSL 1.1 and newer supports all of the TLS versions we care about, so
+  // OpenSSL 1.1.1 and newer supports all of the TLS versions we care about, so
   // the below check is only necessary in older versions of OpenSSL.
-#if OPENSSL_VERSION_NUMBER < 0x10100000L
+#if OPENSSL_VERSION_NUMBER < 0x10101000L
   auto max_supported_tls_version = SSLv23_method()->version;
   DCHECK_GE(max_supported_tls_version, TLS1_VERSION);
 
@@ -127,6 +138,7 @@ Status CheckMaxSupportedTlsVersion(int tls_version, const char* tls_version_str)
 
 TlsContext::TlsContext()
     : tls_ciphers_(SecurityDefaults::kDefaultTlsCiphers),
+      tls_ciphersuites_(SecurityDefaults::kDefaultTlsCipherSuites),
       tls_min_protocol_(SecurityDefaults::kDefaultTlsMinVersion),
       lock_(RWMutex::Priority::PREFER_READING),
       trusted_cert_count_(0),
@@ -135,9 +147,14 @@ TlsContext::TlsContext()
   security::InitializeOpenSSL();
 }
 
-TlsContext::TlsContext(std::string tls_ciphers, std::string tls_min_protocol)
+TlsContext::TlsContext(std::string tls_ciphers,
+                       std::string tls_ciphersuites,
+                       std::string tls_min_protocol,
+                       std::vector<std::string> tls_excluded_protocols)
     : tls_ciphers_(std::move(tls_ciphers)),
+      tls_ciphersuites_(std::move(tls_ciphersuites)),
       tls_min_protocol_(std::move(tls_min_protocol)),
+      tls_excluded_protocols_(std::move(tls_excluded_protocols)),
       lock_(RWMutex::Priority::PREFER_READING),
       trusted_cert_count_(0),
       has_cert_(false),
@@ -158,9 +175,11 @@ Status TlsContext::Init() {
   if (!ctx_) {
     return Status::RuntimeError("failed to create TLS context", GetOpenSSLErrors());
   }
-  SSL_CTX_set_mode(
-      ctx_.get(),
-      SSL_MODE_AUTO_RETRY | SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
+  auto* ctx = ctx_.get();
+  SSL_CTX_set_mode(ctx,
+                   SSL_MODE_AUTO_RETRY |
+                   SSL_MODE_ENABLE_PARTIAL_WRITE |
+                   SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
 
   // Disable SSLv2 and SSLv3 which are vulnerable to various issues such as POODLE.
   // We support versions back to TLSv1.0 since OpenSSL on RHEL 6.4 and earlier does not
@@ -171,15 +190,17 @@ Status TlsContext::Init() {
   //   https://tools.ietf.org/html/rfc7525#section-3.3
   auto options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION;
 
-  if (iequals(tls_min_protocol_, "TLSv1.2")) {
-    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_2_VERSION, "TLSv1.2"));
+  if (iequals(tls_min_protocol_, kTLSv1_3)) {
+    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_3_VERSION, kTLSv1_3));
+    options |= SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1 | SSL_OP_NO_TLSv1_2;
+  } else if (iequals(tls_min_protocol_, kTLSv1_2)) {
+    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_2_VERSION, kTLSv1_2));
     options |= SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1;
-  } else if (iequals(tls_min_protocol_, "TLSv1.1")) {
-    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_1_VERSION, "TLSv1.1"));
+  } else if (iequals(tls_min_protocol_, kTLSv1_1)) {
+    RETURN_NOT_OK(CheckMaxSupportedTlsVersion(TLS1_1_VERSION, kTLSv1_1));
     options |= SSL_OP_NO_TLSv1;
-  } else if (!iequals(tls_min_protocol_, "TLSv1")) {
-    return Status::InvalidArgument("unknown value provided for --rpc_tls_min_protocol flag",
-                                   tls_min_protocol_);
+  } else if (!iequals(tls_min_protocol_, kTLSv1)) {
+    return Status::InvalidArgument("unknown TLS protocol", tls_min_protocol_);
   }
 
 #if OPENSSL_VERSION_NUMBER > 0x1010007fL
@@ -198,11 +219,27 @@ Status TlsContext::Init() {
   options |= SSL_OP_NO_RENEGOTIATION;
 #endif
 
-  // We don't currently support TLS 1.3 because the one-and-a-half-RTT negotiation
-  // confuses our RPC negotiation protocol. See KUDU-2871.
-  options |= SSL_OP_NO_TLSv1_3;
+  for (const auto& proto : tls_excluded_protocols_) {
+    if (iequals(proto, kTLSv1_3)) {
+      options |= SSL_OP_NO_TLSv1_3;
+      continue;
+    }
+    if (iequals(proto, kTLSv1_2)) {
+      options |= SSL_OP_NO_TLSv1_2;
+      continue;
+    }
+    if (iequals(proto, kTLSv1_1)) {
+      options |= SSL_OP_NO_TLSv1_1;
+      continue;
+    }
+    if (iequals(proto, kTLSv1)) {
+      options |= SSL_OP_NO_TLSv1;
+      continue;
+    }
+    return Status::InvalidArgument("unknown TLS protocol", proto);
+  }
 
-  SSL_CTX_set_options(ctx_.get(), options);
+  SSL_CTX_set_options(ctx, options);
 
   // Disable the TLS session cache on both the client and server sides. In Kudu
   // RPC, connections are not re-established based on TLS sessions anyway. Every
@@ -211,11 +248,37 @@ Status TlsContext::Init() {
   // resources to store TLS session information and running the automatic check
   // for expired sessions every 255 connections, as mentioned at
   // https://www.openssl.org/docs/manmaster/man3/SSL_CTX_set_session_cache_mode.html
-  SSL_CTX_set_session_cache_mode(ctx_.get(), SSL_SESS_CACHE_OFF);
+  SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF);
 
+  // The sequence of SSL_CTX_set_ciphersuites() and SSL_CTX_set_cipher_list()
+  // calls below is essential to make sure the TLS engine ends up with usable,
+  // non-empty set of ciphers in case of early 1.1.1 releases of OpenSSL
+  // (like OpenSSL 1.1.1 shipped with Ubuntu 18).
+  //
+  // The SSL_CTX_set_ciphersuites() call cares only about TLSv1.3 ciphers, and
+  // those might be none. From the other side, the implementation of
+  // SSL_CTX_set_cipher_list() verifies that the overall result list of ciphers
+  // is valid and usable, reporting an error otherwise.
+  //
+  // If the sequence is reversed, no error would be reported from
+  // TlsContext::Init() in case of empty list of ciphers for some early-1.1.1
+  // releases of OpenSSL. That's because SSL_CTX_set_cipher_list() would see
+  // a non-empty list of default TLSv1.3 ciphers when given an empty list of
+  // TLSv1.2 ciphers, and SSL_CTX_set_ciphersuites() would allow an empty set
+  // of TLSv1.3 ciphers in a subsequent call.
+
+#if OPENSSL_VERSION_NUMBER >= 0x10101000L
+  // Set TLSv1.3 ciphers.
   OPENSSL_RET_NOT_OK(
-      SSL_CTX_set_cipher_list(ctx_.get(), tls_ciphers_.c_str()),
-      "failed to set TLS ciphers");
+      SSL_CTX_set_ciphersuites(ctx, tls_ciphersuites_.c_str()),
+      Substitute("failed to set TLSv1.3 ciphers: $0", tls_ciphersuites_));
+#endif
+
+  // It's OK to configure pre-TLSv1.3 ciphers even if all pre-TLSv1.3 protocols
+  // are disabled. At least, SSL_CTX_set_cipher_list() call doesn't report
+  // any errors.
+  OPENSSL_RET_NOT_OK(SSL_CTX_set_cipher_list(ctx, tls_ciphers_.c_str()),
+                     Substitute("failed to set TLS ciphers: $0", tls_ciphers_));
 
 #if OPENSSL_VERSION_NUMBER >= 0x10100000L
   // OpenSSL 1.1 and newer supports the 'security level' concept:
@@ -252,8 +315,8 @@ Status TlsContext::Init() {
   // the best curve to use.
   OPENSSL_RET_NOT_OK(SSL_CTX_set_ecdh_auto(ctx_.get(), 1),
                      "failed to configure ECDH support");
-#endif
-#endif
+#endif // #if OPENSSL_VERSION_NUMBER < 0x10002000L ... #elif ...
+#endif // #ifndef OPENSSL_NO_ECDH ...
 
   return Status::OK();
 }
diff --git a/src/kudu/security/tls_context.h b/src/kudu/security/tls_context.h
index a13e838..ef78497 100644
--- a/src/kudu/security/tls_context.h
+++ b/src/kudu/security/tls_context.h
@@ -71,7 +71,19 @@ class TlsContext {
 
   TlsContext();
 
-  TlsContext(std::string tls_ciphers, std::string tls_min_protocol);
+  // Create TLS context using the specified parameters:
+  //  * tls_ciphers
+  //      cipher suites preference list for TLSv1.2 and prior versions
+  //  * tls_ciphersuites
+  //      cipher suites preference list for TLSv1.3
+  //  * tls_min_protocol
+  //      minimum TLS protocol version to enable
+  //  * tls_excluded_protocols
+  //      TLS protocol versions to exclude from the list of enabled ones
+  TlsContext(std::string tls_ciphers,
+             std::string tls_ciphersuites,
+             std::string tls_min_protocol,
+             std::vector<std::string> tls_excluded_protocols = {});
 
   ~TlsContext() = default;
 
@@ -175,14 +187,27 @@ class TlsContext {
 
   Status VerifyCertChainUnlocked(const Cert& cert) WARN_UNUSED_RESULT;
 
-  // The cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
-  // cipher preference list format. See man (1) ciphers for more information.
+  // The cipher suite preferences to use for RPC connections secured with
+  // pre-TLSv1.3 protocols. Uses the OpenSSL cipher preference list format.
+  // See man (1) ciphers for more information.
   std::string tls_ciphers_;
 
-  // The minimum protocol version to allow when for securing RPC connections with TLS. May be
-  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  // TLSv1.3-specific ciphersuites. These are controlled separately from the
+  // pre-TLSv1.3 ones because the OpenSSL API provides separate calls to set
+  // those and the syntax for the TLSv1.3 list differs from the syntax for
+  // the legacy pre-TLSv1.3 ones. See man (1) ciphers for more information.
+  std::string tls_ciphersuites_;
+
+  // The minimum protocol version to allow for securing RPC connections with
+  // TLS. May be one of 'TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3'.
   std::string tls_min_protocol_;
 
+  // TLS protocol versions to exclude from the list of acceptable ones to secure
+  // RPC connections with TLS. An empty container means the set of acceptable
+  // protocol version is defined by 'tls_min_protocol_' and the OpenSSL library
+  // itself.
+  std::vector<std::string> tls_excluded_protocols_;
+
   // Protects all members.
   //
   // Taken in write mode when any changes are modifying the underlying SSL_CTX
diff --git a/src/kudu/security/tls_handshake-test.cc b/src/kudu/security/tls_handshake-test.cc
index c3dc94c..c53171f 100644
--- a/src/kudu/security/tls_handshake-test.cc
+++ b/src/kudu/security/tls_handshake-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/security/tls_handshake.h"
 
+#include <openssl/crypto.h>
 #include <openssl/ssl.h>
 
 #include <atomic>
@@ -35,6 +36,7 @@
 #include "kudu/security/crypto.h"
 #include "kudu/security/openssl_util.h"
 #include "kudu/security/security-test-util.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_context.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -116,7 +118,6 @@ class TestTlsHandshakeBase : public KuduTest {
         }
       }
       if (!server_done) {
-        CHECK(!client_done);
         Status s = server.Continue(to_server, &to_client);
         VLOG(1) << "server->client: " << to_client.size() << " bytes";
         if (s.ok()) {
@@ -241,33 +242,50 @@ TEST_P(TestTlsHandshakeConcurrent, TestConcurrentAdoptCert) {
   SleepFor(MonoDelta::FromMilliseconds(10));
 }
 
-TEST_F(TestTlsHandshake, TestHandshakeSequence) {
+TEST_F(TestTlsHandshake, HandshakeSequenceNoTLSv1dot3) {
+  static const vector<string> kTlsExcludedProtocols = { "TLSv1.3" };
+
   PrivateKey ca_key;
   Cert ca_cert;
   ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
 
+  TlsContext client_tls(SecurityDefaults::kDefaultTlsCiphers,
+                        SecurityDefaults::kDefaultTlsCipherSuites,
+                        SecurityDefaults::kDefaultTlsMinVersion,
+                        kTlsExcludedProtocols);
+  ASSERT_OK(client_tls.Init());
+
+  TlsContext server_tls(SecurityDefaults::kDefaultTlsCiphers,
+                        SecurityDefaults::kDefaultTlsCipherSuites,
+                        SecurityDefaults::kDefaultTlsMinVersion,
+                        kTlsExcludedProtocols);
+  ASSERT_OK(server_tls.Init());
+
   // Both client and server have certs and CA.
-  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &client_tls_));
-  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &server_tls_));
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &client_tls));
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &server_tls));
 
   TlsHandshake server(TlsHandshakeType::SERVER);
-  ASSERT_OK(client_tls_.InitiateHandshake(&server));
+  ASSERT_OK(client_tls.InitiateHandshake(&server));
   TlsHandshake client(TlsHandshakeType::CLIENT);
-  ASSERT_OK(server_tls_.InitiateHandshake(&client));
+  ASSERT_OK(server_tls.InitiateHandshake(&client));
 
   string buf1;
   string buf2;
 
   // Client sends Hello
-  ASSERT_TRUE(client.Continue(buf1, &buf2).IsIncomplete());
+  auto s = client.Continue(buf1, &buf2);
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
   ASSERT_GT(buf2.size(), 0);
 
   // Server receives client Hello, and sends server Hello
-  ASSERT_TRUE(server.Continue(buf2, &buf1).IsIncomplete());
+  s = server.Continue(buf2, &buf1);
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
   ASSERT_GT(buf1.size(), 0);
 
   // Client receives server Hello and sends client Finished
-  ASSERT_TRUE(client.Continue(buf1, &buf2).IsIncomplete());
+  s = client.Continue(buf1, &buf2);
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
   ASSERT_GT(buf2.size(), 0);
 
   // Server receives client Finished and sends server Finished
@@ -292,6 +310,118 @@ TEST_F(TestTlsHandshake, TestHandshakeSequence) {
   NO_FATALS(ReadAndCompare(server_ssl, "bye"));
 }
 
+#if OPENSSL_VERSION_NUMBER >= 0x10101000L
+// This scenario is specific to TLSv1.3 handshake negotiation, so it's enabled
+// only if the OpenSSL library supports TLSv1.3.
+TEST_F(TestTlsHandshake, HandshakeSequenceTLSv1dot3) {
+  // NOTE: since --rpc_tls_min_protocol=TLSv1.3 flag isn't added, this scenario
+  //       also verifies that the client and the server sides behave as expected
+  //       by choosing TLSv1.3 in accordance to the cipher suite preference list
+  //       specified by the --rpc_tls_ciphersuites and --rpc_tls_ciphers flags.
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  // Server has certificate signed by CA, client has self-signed certificate.
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SELF_SIGNED, ca_cert, ca_key, &client_tls_));
+  ASSERT_OK(ConfigureTlsContext(PkiConfig::SIGNED, ca_cert, ca_key, &server_tls_));
+
+  TlsHandshake server(TlsHandshakeType::SERVER);
+  ASSERT_OK(client_tls_.InitiateHandshake(&server));
+  // The client has a self-signed certificate, so the server has nothing to
+  // verify during the connection negotiation.
+  server.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+
+  TlsHandshake client(TlsHandshakeType::CLIENT);
+  ASSERT_OK(server_tls_.InitiateHandshake(&client));
+  // That's the first connection from the client to the server/cluster, and the
+  // client hasn't yet seen a certificate of this Kudu cluster, hence it cannot
+  // verify the certificate of the server regardless the fact that the server's
+  // certificate is valid and signed by the cluster's CA private key.
+  client.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+
+  auto* client_ssl = client.ssl();
+  auto* server_ssl = server.ssl();
+
+  string buf1;
+  string buf2;
+
+  // Client sends "Hello" (supported ciphersuites, keyshares, etc.).
+  auto s = client.Continue(buf1, &buf2);
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+  ASSERT_GT(buf2.size(), 0);
+
+  // Server receives client's "Hello", chooses cipher, calculates keyshares,
+  // encrypts certificate, Finish messages and sends all this back to client.
+  s = server.Continue(buf2, &buf1);
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+  ASSERT_GT(buf1.size(), 0);
+
+  // Client receives server's Hello and Finish messages.
+  ASSERT_OK(client.Continue(buf1, &buf2));
+  ASSERT_GT(buf2.size(), 0);
+
+  // This isn't a part of the TLSv1.3 handshake you'd see in the docs, but it's
+  // here due to the nature of the step-by-step connection negotiation protocol
+  // used by Kudu RPC. In the wild (i.e. if using the direct connection over the
+  // network, not the intermediate SASL framework), 'buf2' data would get to the
+  // server side with encrypted data from the very first request sent by the
+  // client to the server.
+  ASSERT_OK(server.Continue(buf2, &buf1));
+  ASSERT_GT(buf1.size(), 0);
+
+  // An extra sanity check: since the initial phase of the handshake is done,
+  // make sure it's indeed TLSv1.3 protocol, as expected.
+  ASSERT_EQ(TLS1_3_VERSION, SSL_version(client_ssl));
+  ASSERT_EQ(TLS1_3_VERSION, SSL_version(server_ssl));
+
+  // The code block below passes the data produced by the server's final TLS
+  // handshake message to the client side. It's not an application data: it
+  // should be passed by the encrypted/raw side of the communication channel,
+  // not via the SSL_{read,write}() API. In case of a regular over-the-network
+  // TLS negotiation, this data would get to the client side over the wire along
+  // with the encrypted data of the very first response sent by the server to
+  // the client. However, since Kudu RPC connection negotiation works in a
+  // step-by-step manner and runs on top of the SASL framework, this scenario
+  // emulates that by moving the data to the read BIO of the client-side SSL
+  // object. Once the data is in the buffer of the appropriate BIO, it will be
+  // pushed through and processed by the TLS engine with next chunk of encrypted
+  // application data received by the client.
+  {
+    BIO* rbio = SSL_get_rbio(client_ssl);
+    auto bytes_written = BIO_write(rbio, buf1.data(), buf1.size());
+    DCHECK_EQ(buf1.size(), bytes_written);
+    DCHECK_EQ(buf1.size(), BIO_ctrl_pending(rbio));
+  }
+
+  // The TLS handshake is now complete: both sides can send and receive data
+  // using OpenSSL's API. In other words, it's possible to use SSL_{read,write}
+  // to successfully send application data from the client to the server and
+  // back.
+  {
+    // The sequence of messages in this sub-scenario matches those produced by
+    // the TestRpc.TestCall/TCP_SSL scenario in src/kudu/rpc/rpc-test.cc.
+    const string client_msg_0 = "0123456789012345";
+    NO_FATALS(Write(client_ssl, client_msg_0));
+    const string client_msg_1 = "01234567890123456789012";
+    NO_FATALS(Write(client_ssl, client_msg_1));
+
+    NO_FATALS(Transfer(client_ssl, server_ssl));
+    NO_FATALS(ReadAndCompare(server_ssl, "0123"));
+    NO_FATALS(ReadAndCompare(server_ssl, "45678901234501234567890123456789012"));
+
+    const string server_msg_0 = "0123456789012345";
+    NO_FATALS(Write(server_ssl, server_msg_0));
+    const string server_msg_1 = "012";
+    NO_FATALS(Write(server_ssl, server_msg_1));
+
+    NO_FATALS(Transfer(server_ssl, client_ssl));
+    NO_FATALS(ReadAndCompare(client_ssl, "0123"));
+    NO_FATALS(ReadAndCompare(client_ssl, "456789012345012"));
+  }
+}
+#endif // #if OPENSSL_VERSION_NUMBER >= 0x10101000L ...
+
 // Tests that the TlsContext can transition from self signed cert to signed
 // cert, and that it rejects invalid certs along the way. We are testing this
 // here instead of in a dedicated TlsContext test because it requires completing
diff --git a/src/kudu/security/tls_handshake.cc b/src/kudu/security/tls_handshake.cc
index 96e3b88..90673e7 100644
--- a/src/kudu/security/tls_handshake.cc
+++ b/src/kudu/security/tls_handshake.cc
@@ -21,8 +21,10 @@
 #include <openssl/ssl.h>
 #include <openssl/x509.h>
 
+#include <cstdint>
 #include <memory>
 #include <string>
+#include <ostream>
 #include <utility>
 
 #include "kudu/gutil/strings/strip.h"
@@ -38,6 +40,10 @@
 #include "kudu/security/x509_check_host.h"
 #endif // OPENSSL_VERSION_NUMBER
 
+#ifndef TLS1_3_VERSION
+#define TLS1_3_VERSION 0x0304
+#endif
+
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;
@@ -127,6 +133,11 @@ Status TlsHandshake::Continue(const string& recv, string* send) {
   DCHECK(ssl_);
   auto* ssl = ssl_.get();
 
+  // +------+                       +-----+
+  // |      |--> BIO_write(rbio) -->|     |--> SSL_read(ssl)  --> IN
+  // | SASL |                       | SSL |
+  // |      |<--  BIO_read(wbio) <--|     |<-- SSL_write(ssl) <-- OUT
+  // +------+                       +-----+
   BIO* rbio = SSL_get_rbio(ssl);
   int n = BIO_write(rbio, recv.data(), recv.size());
   DCHECK(n == recv.size() || (n == -1 && recv.empty()));
@@ -144,23 +155,57 @@ Status TlsHandshake::Continue(const string& recv, string* send) {
     // the ERR error queue, so no need to ERR_clear_error() here.
   }
 
-  BIO* wbio = SSL_get_wbio(ssl_.get());
+  BIO* wbio = SSL_get_wbio(ssl);
   int pending = BIO_ctrl_pending(wbio);
+  DCHECK_GE(pending, 0);
 
   send->resize(pending);
-  BIO_read(wbio, &(*send)[0], send->size());
-  DCHECK_EQ(BIO_ctrl_pending(wbio), 0);
+  if (pending > 0) {
+    int bytes_read = BIO_read(wbio, &(*send)[0], send->size());
+    DCHECK_EQ(bytes_read, send->size());
+    DCHECK_EQ(BIO_ctrl_pending(wbio), 0);
+  }
 
   if (rc == 1) {
-    // The handshake is done, but in the case of the server, we still need to
-    // send the final response to the client.
-    DCHECK_GE(send->size(), 0);
+    // SSL_do_handshake() must have read all the pending data.
+    DCHECK_EQ(0, BIO_ctrl_pending(rbio));
+    VLOG(2) << Substitute("TSL Handshake complete");
     return Status::OK();
   }
   return Status::Incomplete("TLS Handshake incomplete");
 }
 
-Status TlsHandshake::Verify(const Socket& socket) const {
+bool TlsHandshake::NeedsExtraStep(const Status& continue_status,
+                                  const string& token) const {
+  DCHECK(has_started_);
+  DCHECK(ssl_);
+  DCHECK(continue_status.ok() || continue_status.IsIncomplete());
+
+  if (continue_status.IsIncomplete()) {
+    return true;
+  }
+  if (continue_status.ok()) {
+    switch (type_) {
+      case TlsHandshakeType::CLIENT:
+        return !token.empty();
+      case TlsHandshakeType::SERVER:
+        if (SSL_version(ssl_.get()) == TLS1_3_VERSION) {
+          return false;
+        }
+        return !token.empty();
+    }
+  }
+  return false;
+}
+
+void TlsHandshake::StorePendingData(string data) {
+  DCHECK(!data.empty());
+  // This is used only for the TLSv1.3 protocol.
+  DCHECK_EQ(TLS1_3_VERSION, SSL_version(ssl_.get()));
+  rbio_pending_data_ = std::move(data);
+}
+
+Status TlsHandshake::Verify(const Socket& /*socket*/) const {
   SCOPED_OPENSSL_NO_PENDING_ERRORS;
   DCHECK(SSL_is_init_finished(ssl_.get()));
   CHECK(ssl_);
@@ -242,14 +287,45 @@ Status TlsHandshake::Finish(unique_ptr<Socket>* socket) {
   RETURN_NOT_OK(Verify(**socket));
 
   int fd = (*socket)->Release();
+  auto* ssl = ssl_.get();
+
+  // Nothing should left in the memory-based BIOs upon Finish() is called.
+  // Otherwise, the buffered data would be lost because those BIOs are destroyed
+  // when SSL_set_fd() is called below.
+  DCHECK_EQ(0, SSL_pending(ssl));
+
+  BIO* wbio = SSL_get_wbio(ssl);
+  DCHECK_EQ(0, BIO_ctrl_pending(wbio));
+  DCHECK_EQ(0, BIO_ctrl_wpending(wbio));
+
+  BIO* rbio = SSL_get_rbio(ssl);
+  DCHECK_EQ(0, BIO_ctrl_pending(rbio));
+  DCHECK_EQ(0, BIO_ctrl_wpending(rbio));
 
   // Give the socket to the SSL instance. This will automatically free the
   // read and write memory BIO instances.
-  int ret = SSL_set_fd(ssl_.get(), fd);
+  int ret = SSL_set_fd(ssl, fd);
   if (ret != 1) {
     return Status::RuntimeError("TLS handshake error", GetOpenSSLErrors());
   }
 
+  const auto data_size = rbio_pending_data_.size();
+  if (data_size != 0) {
+    int fd = SSL_get_fd(ssl);
+    Socket sock(fd);
+    uint8_t* data = reinterpret_cast<uint8_t*>(rbio_pending_data_.data());
+    int32_t written = 0;
+    RETURN_NOT_OK(sock.Write(data, data_size, &written));
+    if (written != data_size) {
+      // The socket should be in blocking mode, so Write() should return with
+      // success only if all the data is written.
+      return Status::IllegalState(
+          Substitute("wrote only $0 out of $1 bytes", written, data_size));
+    }
+    sock.Release(); // do not close the descriptor when Socket goes out of scope
+    rbio_pending_data_.clear();
+  }
+
   // Transfer the SSL instance to the socket.
   socket->reset(new TlsSocket(fd, std::move(ssl_)));
 
diff --git a/src/kudu/security/tls_handshake.h b/src/kudu/security/tls_handshake.h
index 70cdc94..d2c17fc 100644
--- a/src/kudu/security/tls_handshake.h
+++ b/src/kudu/security/tls_handshake.h
@@ -105,6 +105,22 @@ class TlsHandshake {
   // Returns any other status code on error.
   Status Continue(const std::string& recv, std::string* send) WARN_UNUSED_RESULT;
 
+  // Whether an extra step of negotiation is needed at this point given the
+  // return status of a prior call to the Continue() method.
+  bool NeedsExtraStep(const Status& continue_status,
+                      const std::string& token) const;
+
+  // This method is used to store the data produced by the server's final TLS
+  // handshake message. It's not an application data and should be passed by the
+  // encrypted/raw side of the communication channel, not via the regular
+  // SSL_{read,write}() API. Once stored, the data is written to the underlying
+  // socket by the Finish() method upon transitioning from memory-based BIOs
+  // to the socket-based BIOs when the negotiation is complete. Once it's
+  // written to the socket, it will be sent over the wire along with next chunk
+  // of encryped data sent by the server to the client, prepending the encrypted
+  // server's response.
+  void StorePendingData(std::string data);
+
   // Finishes the handshake, wrapping the provided socket in the negotiated TLS
   // channel. This 'TlsHandshake' instance should not be used again after
   // calling this.
@@ -139,7 +155,8 @@ class TlsHandshake {
   std::string GetCipherDescription() const;
 
  private:
-  FRIEND_TEST(TestTlsHandshake, TestHandshakeSequence);
+  FRIEND_TEST(TestTlsHandshake, HandshakeSequenceNoTLSv1dot3);
+  FRIEND_TEST(TestTlsHandshake, HandshakeSequenceTLSv1dot3);
 
   // Set the verification mode on the underlying SSL object.
   void SetSSLVerify();
@@ -165,6 +182,10 @@ class TlsHandshake {
 
   Cert local_cert_;
   Cert remote_cert_;
+
+  // Data which is left pending in the write BIO after the last call to
+  // Continue(), i.e. the data left pending after completing the TLS handshake.
+  std::string rbio_pending_data_;
 };
 
 } // namespace security
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 7e7a546..a10ac75 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -42,6 +42,7 @@
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
@@ -158,18 +159,42 @@ TAG_FLAG(rpc_listen_on_unix_domain_socket, experimental);
 
 DEFINE_string(rpc_tls_ciphers,
               kudu::security::SecurityDefaults::kDefaultTlsCiphers,
-              "The cipher suite preferences to use for TLS-secured RPC connections. "
-              "Uses the OpenSSL cipher preference list format. See man (1) ciphers "
-              "for more information.");
+              "TLSv1.2 (and prior) cipher suite preferences to use for "
+              "TLS-secured RPC connections. Uses the OpenSSL cipher preference "
+              "list format for TLSv1.2 and prior TLS protocol versions, "
+              "for customizing TLSv1.3 cipher suites see "
+              "--rpc_tls_ciphersuites flag. See 'man (1) ciphers' for more "
+              "information.");
 TAG_FLAG(rpc_tls_ciphers, advanced);
 
+// The names for the '--rpc_tls_ciphers' and '--rpc_tls_ciphersuites' flags are
+// confusingly close to each other, but the idea of leaking TLS versions into
+// the flag names sounds even worse. Probably, at some point '--rpc_tls_ciphers'
+// may become deprecated once TLSv1.2 is declared obsolete.
+DEFINE_string(rpc_tls_ciphersuites,
+              kudu::security::SecurityDefaults::kDefaultTlsCipherSuites,
+              "TLSv1.3 cipher suite preferences to use for TLS-secured RPC "
+              "connections. Uses the OpenSSL TLSv1.3 ciphersuite format. "
+              "See 'man (1) ciphers' for more information. This flag is "
+              "effective only if Kudu is built with OpenSSL v1.1.1 or newer.");
+TAG_FLAG(rpc_tls_ciphersuites, advanced);
+
 DEFINE_string(rpc_tls_min_protocol,
               kudu::security::SecurityDefaults::kDefaultTlsMinVersion,
               "The minimum protocol version to allow when for securing RPC "
-              "connections with TLS. May be one of 'TLSv1', 'TLSv1.1', or "
-              "'TLSv1.2'.");
+              "connections with TLS. May be one of 'TLSv1', 'TLSv1.1', "
+              "'TLSv1.2', 'TLSv1.3'.");
 TAG_FLAG(rpc_tls_min_protocol, advanced);
 
+DEFINE_string(rpc_tls_excluded_protocols, "",
+              "A comma-separated list of TLS protocol versions to exclude from "
+              "the set of advertised by the server when securing RPC "
+              "connections with TLS. An empty string means the set of "
+              "available TLS protocol versions is defined by the OpenSSL "
+              "library and --rpc_tls_min_protocol flag.");
+TAG_FLAG(rpc_tls_excluded_protocols, advanced);
+TAG_FLAG(rpc_tls_excluded_protocols, experimental);
+
 DEFINE_string(rpc_certificate_file, "",
               "Path to a PEM encoded X509 certificate to use for securing RPC "
               "connections with SSL/TLS. If set, '--rpc_private_key_file' and "
@@ -236,10 +261,41 @@ using std::vector;
 using strings::Substitute;
 
 namespace kudu {
+
+bool IsValidTlsProtocolStr(const string& str) {
+  return
+      iequals(str, "TLSv1.3") ||
+      iequals(str, "TLSv1.2") ||
+      iequals(str, "TLSv1.1") ||
+      iequals(str, "TLSv1");
+}
+
 namespace server {
 
 namespace {
 
+bool ValidateTlsProtocol(const char* /*flagname*/, const string& value) {
+  return IsValidTlsProtocolStr(value);
+}
+DEFINE_validator(rpc_tls_min_protocol, &ValidateTlsProtocol);
+
+bool ValidateTlsExcludedProtocols(const char* /*flagname*/,
+                                  const std::string& value) {
+  if (value.empty()) {
+    return true;
+  }
+
+  vector<string> str_protos = strings::Split(value, ",", strings::SkipEmpty());
+  for (const auto& str : str_protos) {
+    if (IsValidTlsProtocolStr(str)) {
+      continue;
+    }
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(rpc_tls_excluded_protocols, &ValidateTlsExcludedProtocols);
+
 bool ValidateKeytabPermissions() {
   if (!FLAGS_keytab_file.empty() && !FLAGS_allow_world_readable_credentials) {
     bool world_readable_keytab;
@@ -508,6 +564,9 @@ Status ServerBase::Init() {
 
   RETURN_NOT_OK(InitAcls());
 
+  vector<string> rpc_tls_excluded_protocols = strings::Split(
+      FLAGS_rpc_tls_excluded_protocols, ",", strings::SkipEmpty());
+
   // Create the Messenger.
   rpc::MessengerBuilder builder(name_);
   builder.set_num_reactors(FLAGS_num_reactor_threads)
@@ -519,6 +578,8 @@ Status ServerBase::Init() {
          .set_rpc_authentication(FLAGS_rpc_authentication)
          .set_rpc_encryption(FLAGS_rpc_encryption)
          .set_rpc_tls_ciphers(FLAGS_rpc_tls_ciphers)
+         .set_rpc_tls_ciphersuites(FLAGS_rpc_tls_ciphersuites)
+         .set_rpc_tls_excluded_protocols(std::move(rpc_tls_excluded_protocols))
          .set_rpc_tls_min_protocol(FLAGS_rpc_tls_min_protocol)
          .set_epki_cert_key_files(FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file)
          .set_epki_certificate_authority_file(FLAGS_rpc_ca_certificate_file)