You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/12/18 20:59:15 UTC

kudu git commit: KUDU-2228: Make Messenger options configurable

Repository: kudu
Updated Branches:
  refs/heads/master 14bf71e73 -> b357fa9b7


KUDU-2228: Make Messenger options configurable

Currently, the RPC layer accesses many gflags directly to take
certain decisions, eg. whether to turn on encryption,
authentication, etc.

Since the RPC layer is to be used more like a library, these should
be configurable options that are passed to the Messenger
(which is the API endpoint for the application using the RPC layer),
instead of the RPC layer itself directly accessing these flags.

This patch converts the following flags to Messenger options and moves
the flag definitions to server_base.cc which is the "application" in
Kudu that uses the Messenger:

FLAGS_rpc_default_keepalive_time_ms
FLAGS_rpc_negotiation_timeout_ms
FLAGS_rpc_authentication
FLAGS_rpc_encryption
FLAGS_rpc_tls_ciphers
FLAGS_rpc_tls_min_protocol
FLAGS_rpc_certificate_file
FLAGS_rpc_private_key_file
FLAGS_rpc_ca_certificate_file
FLAGS_rpc_private_key_password_cmd
FLAGS_keytab_file

Most of the remaining flags are test or benchmark related flags. There
may be a few more flags that can be moved out and converted to options,
but we can leave that as future work if we decide to move them.

Change-Id: Ia21814ffb6e283c2791985b089878b579905f0ba
Reviewed-on: http://gerrit.cloudera.org:8080/8789
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b357fa9b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b357fa9b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b357fa9b

Branch: refs/heads/master
Commit: b357fa9b729ce52627a862e08ebac822ae470bc9
Parents: 14bf71e
Author: Sailesh Mukil <sa...@apache.org>
Authored: Thu Dec 7 03:25:11 2017 -0800
Committer: Dan Burkert <da...@apache.org>
Committed: Mon Dec 18 20:58:56 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/client_negotiation.cc      |   2 -
 src/kudu/rpc/client_negotiation.h       |   1 +
 src/kudu/rpc/messenger.cc               | 253 +++++++++------------------
 src/kudu/rpc/messenger.h                |  86 +++++++--
 src/kudu/rpc/negotiation.cc             |  12 +-
 src/kudu/rpc/negotiation.h              |   7 +-
 src/kudu/rpc/reactor.cc                 |   7 +-
 src/kudu/rpc/rpc-test-base.h            |  33 +++-
 src/kudu/rpc/rpc-test.cc                |  48 +++--
 src/kudu/rpc/sasl_common.cc             |  24 +--
 src/kudu/rpc/sasl_common.h              |   2 +-
 src/kudu/rpc/server_negotiation.cc      |   2 -
 src/kudu/rpc/server_negotiation.h       |   1 +
 src/kudu/security/CMakeLists.txt        |   1 +
 src/kudu/security/init.cc               |  43 +----
 src/kudu/security/init.h                |   3 +
 src/kudu/security/security_flags.cc     |  42 +++++
 src/kudu/security/security_flags.h      |  36 ++++
 src/kudu/security/test/mini_kdc-test.cc |   7 +-
 src/kudu/security/tls_context.cc        |  51 ++----
 src/kudu/security/tls_context.h         |  10 ++
 src/kudu/server/server_base.cc          | 203 ++++++++++++++++++++-
 src/kudu/server/webserver_options.cc    |  12 +-
 src/kudu/util/flags.cc                  |  17 ++
 src/kudu/util/flags.h                   |  11 ++
 25 files changed, 580 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
index 219ad22..71dde92 100644
--- a/src/kudu/rpc/client_negotiation.cc
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -280,8 +280,6 @@ Status ClientNegotiation::SendConnectionHeader() {
 }
 
 Status ClientNegotiation::InitSaslClient() {
-  RETURN_NOT_OK(SaslInit());
-
   // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA
   unsigned flags = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
index 19a155c..06fb2b8 100644
--- a/src/kudu/rpc/client_negotiation.h
+++ b/src/kudu/rpc/client_negotiation.h
@@ -33,6 +33,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/util/monotime.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 1c8b5b2..3c69512 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -25,9 +25,6 @@
 #include <type_traits>
 #include <utility>
 
-#include <boost/algorithm/string/predicate.hpp>
-#include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
@@ -51,14 +48,11 @@
 #include "kudu/security/openssl_util.h"
 #include "kudu/security/tls_context.h"
 #include "kudu/security/token_verifier.h"
-#include "kudu/util/env.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/flag_validators.h"
+#include "kudu/util/flags.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/socket.h"
 #include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread_restrictions.h"
 #include "kudu/util/threadpool.h"
@@ -68,58 +62,6 @@ using std::shared_ptr;
 using std::make_shared;
 using strings::Substitute;
 
-DEFINE_string(rpc_authentication, "optional",
-              "Whether to require RPC connections to authenticate. Must be one "
-              "of 'disabled', 'optional', or 'required'. If 'optional', "
-              "authentication will be used when the remote end supports it. If "
-              "'required', connections which are not able to authenticate "
-              "(because the remote end lacks support) are rejected. Secure "
-              "clusters should use 'required'.");
-DEFINE_string(rpc_encryption, "optional",
-              "Whether to require RPC connections to be encrypted. Must be one "
-              "of 'disabled', 'optional', or 'required'. If 'optional', "
-              "encryption will be used when the remote end supports it. If "
-              "'required', connections which are not able to use encryption "
-              "(because the remote end lacks support) are rejected. If 'disabled', "
-              "encryption will not be used, and RPC authentication "
-              "(--rpc_authentication) must also be disabled as well. "
-              "Secure clusters should use 'required'.");
-TAG_FLAG(rpc_authentication, evolving);
-TAG_FLAG(rpc_encryption, evolving);
-
-DEFINE_string(rpc_certificate_file, "",
-              "Path to a PEM encoded X509 certificate to use for securing RPC "
-              "connections with SSL/TLS. If set, '--rpc_private_key_file' and "
-              "'--rpc_ca_certificate_file' must be set as well.");
-DEFINE_string(rpc_private_key_file, "",
-              "Path to a PEM encoded private key paired with the certificate "
-              "from '--rpc_certificate_file'");
-DEFINE_string(rpc_ca_certificate_file, "",
-              "Path to the PEM encoded X509 certificate of the trusted external "
-              "certificate authority. The provided certificate should be the root "
-              "issuer of the certificate passed in '--rpc_certificate_file'.");
-DEFINE_string(rpc_private_key_password_cmd, "", "A Unix command whose output "
-              "returns the password used to decrypt the RPC server's private key "
-              "file specified in --rpc_private_key_file. If the .PEM key file is "
-              "not password-protected, this flag does not need to be set. "
-              "Trailing whitespace will be trimmed before it is used to decrypt "
-              "the private key.");
-
-// Setting TLS certs and keys via CLI flags is only necessary for external
-// PKI-based security, which is not yet production ready. Instead, see
-// internal PKI (ipki) and Kerberos-based authentication.
-TAG_FLAG(rpc_certificate_file, experimental);
-TAG_FLAG(rpc_private_key_file, experimental);
-TAG_FLAG(rpc_ca_certificate_file, experimental);
-
-DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
-             "If an RPC connection from a client is idle for this amount of time, the server "
-             "will disconnect the client.");
-TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
-
-DECLARE_string(keytab_file);
-DECLARE_bool(allow_world_readable_credentials);
-
 namespace boost {
 template <typename Signature> class function;
 }
@@ -127,112 +69,19 @@ template <typename Signature> class function;
 namespace kudu {
 namespace rpc {
 
-template <typename T>
-static Status ParseTriState(const char* flag_name, const string& flag_value, T* tri_state) {
-  if (boost::iequals(flag_value, "required")) {
-    *tri_state = T::REQUIRED;
-  } else if (boost::iequals(flag_value, "optional")) {
-    *tri_state = T::OPTIONAL;
-  } else if (boost::iequals(flag_value, "disabled")) {
-    *tri_state = T::DISABLED;
-  } else {
-    return Status::InvalidArgument(Substitute(
-          "$0 flag must be one of 'required', 'optional', or 'disabled'",
-          flag_name));
-  }
-  return Status::OK();
-}
-
-static bool ValidateRpcAuthentication(const char* flag_name, const string& flag_value) {
-  RpcAuthentication result;
-  Status s = ParseTriState(flag_name, flag_value, &result);
-  if (!s.ok()) {
-    LOG(ERROR) << s.message().ToString();
-    return false;
-  }
-  return true;
-}
-DEFINE_validator(rpc_authentication, &ValidateRpcAuthentication);
-
-static bool ValidateRpcEncryption(const char* flag_name, const string& flag_value) {
-  RpcEncryption result;
-  Status s = ParseTriState(flag_name, flag_value, &result);
-  if (!s.ok()) {
-    LOG(ERROR) << s.message().ToString();
-    return false;
-  }
-  return true;
-}
-DEFINE_validator(rpc_encryption, &ValidateRpcEncryption);
-
-static bool ValidateRpcAuthnFlags() {
-  RpcAuthentication authentication;
-  CHECK_OK(ParseTriState("--rpc_authentication", FLAGS_rpc_authentication, &authentication));
-
-  RpcEncryption encryption;
-  CHECK_OK(ParseTriState("--rpc_encryption", FLAGS_rpc_encryption, &encryption));
-
-  if (encryption == RpcEncryption::DISABLED && authentication != RpcAuthentication::DISABLED) {
-    LOG(ERROR) << "RPC authentication (--rpc_authentication) must be disabled "
-                  "if RPC encryption (--rpc_encryption) is disabled";
-    return false;
-  }
-
-  const bool has_keytab = !FLAGS_keytab_file.empty();
-  const bool has_cert = !FLAGS_rpc_certificate_file.empty();
-  if (authentication == RpcAuthentication::REQUIRED && !has_keytab && !has_cert) {
-    LOG(ERROR) << "RPC authentication (--rpc_authentication) may not be "
-                  "required unless Kerberos (--keytab_file) or external PKI "
-                  "(--rpc_certificate_file et al) are configured";
-    return false;
-  }
-
-  return true;
-}
-GROUP_FLAG_VALIDATOR(rpc_authn_flags, ValidateRpcAuthnFlags);
-
-static bool ValidateExternalPkiFlags() {
-  bool has_cert = !FLAGS_rpc_certificate_file.empty();
-  bool has_key = !FLAGS_rpc_private_key_file.empty();
-  bool has_ca = !FLAGS_rpc_ca_certificate_file.empty();
-
-  if (has_cert != has_key || has_cert != has_ca) {
-    LOG(ERROR) << "--rpc_certificate_file, --rpc_private_key_file, and "
-                  "--rpc_ca_certificate_file flags must be set as a group; "
-                  "i.e. either set all or none of them.";
-    return false;
-  }
-
-  if (has_key && !FLAGS_allow_world_readable_credentials) {
-    bool world_readable_private_key;
-    Status s = Env::Default()->IsFileWorldReadable(FLAGS_rpc_private_key_file,
-                                                   &world_readable_private_key);
-    if (!s.ok()) {
-      LOG(ERROR) << Substitute("$0: could not verify private key file does not have "
-                               "world-readable permissions: $1",
-                               FLAGS_rpc_private_key_file, s.ToString());
-      return false;
-    }
-    if (world_readable_private_key) {
-      LOG(ERROR) << "cannot use private key file with world-readable permissions: "
-                 << FLAGS_rpc_private_key_file;
-      return false;
-    }
-  }
-
-  return true;
-}
-GROUP_FLAG_VALIDATOR(external_pki_flags, ValidateExternalPkiFlags);
-
 MessengerBuilder::MessengerBuilder(std::string name)
     : name_(std::move(name)),
-      connection_keepalive_time_(
-          MonoDelta::FromMilliseconds(FLAGS_rpc_default_keepalive_time_ms)),
+      connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
       num_reactors_(4),
       min_negotiation_threads_(0),
       max_negotiation_threads_(4),
       coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
+      rpc_negotiation_timeout_ms_(3000),
       sasl_proto_name_("kudu"),
+      rpc_authentication_("optional"),
+      rpc_encryption_("optional"),
+      rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
       enable_inbound_tls_(false) {
 }
 
@@ -267,8 +116,61 @@ MessengerBuilder &MessengerBuilder::set_metric_entity(
   return *this;
 }
 
-MessengerBuilder &MessengerBuilder::set_sasl_proto_name(std::string sasl_proto_name) {
-  sasl_proto_name_ = std::move(sasl_proto_name);
+MessengerBuilder &MessengerBuilder::set_connection_keep_alive_time(int32_t time_in_ms) {
+  connection_keepalive_time_ = MonoDelta::FromMilliseconds(time_in_ms);
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_negotiation_timeout_ms(int64_t time_in_ms) {
+  rpc_negotiation_timeout_ms_ = time_in_ms;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_sasl_proto_name(const std::string& sasl_proto_name) {
+  sasl_proto_name_ = sasl_proto_name;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_authentication(const std::string& rpc_authentication) {
+  rpc_authentication_ = rpc_authentication;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_encryption(const std::string& rpc_encryption) {
+  rpc_encryption_ = rpc_encryption;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers) {
+  rpc_tls_ciphers_ = rpc_tls_ciphers;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_min_protocol(
+    const std::string& rpc_tls_min_protocol) {
+  rpc_tls_min_protocol_ = rpc_tls_min_protocol;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_cert_key_files(
+    const std::string& cert, const std::string& private_key) {
+  rpc_certificate_file_ = cert;
+  rpc_private_key_file_ = private_key;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_certificate_authority_file(const std::string& ca) {
+  rpc_ca_certificate_file_ = ca;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_private_password_key_cmd(const std::string& cmd) {
+  rpc_private_key_password_cmd_ = cmd;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_keytab_file(const std::string& keytab_file) {
+  keytab_file_ = keytab_file;
   return *this;
 }
 
@@ -278,7 +180,8 @@ MessengerBuilder& MessengerBuilder::enable_inbound_tls() {
 }
 
 Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
-  RETURN_NOT_OK(SaslInit()); // Initialize SASL library before we start making requests
+  // Initialize SASL library before we start making requests
+  RETURN_NOT_OK(SaslInit(!keytab_file_.empty()));
 
   Messenger* new_msgr(new Messenger(*this));
 
@@ -287,34 +190,34 @@ Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
   });
 
   RETURN_NOT_OK(ParseTriState("--rpc_authentication",
-                              FLAGS_rpc_authentication,
+                              rpc_authentication_,
                               &new_msgr->authentication_));
 
   RETURN_NOT_OK(ParseTriState("--rpc_encryption",
-                              FLAGS_rpc_encryption,
+                              rpc_encryption_,
                               &new_msgr->encryption_));
 
   RETURN_NOT_OK(new_msgr->Init());
   if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
     auto* tls_context = new_msgr->mutable_tls_context();
 
-    if (!FLAGS_rpc_certificate_file.empty()) {
-      CHECK(!FLAGS_rpc_private_key_file.empty());
-      CHECK(!FLAGS_rpc_ca_certificate_file.empty());
+    if (!rpc_certificate_file_.empty()) {
+      CHECK(!rpc_private_key_file_.empty());
+      CHECK(!rpc_ca_certificate_file_.empty());
 
       // TODO(KUDU-1920): should we try and enforce that the server
       // is in the subject or alt names of the cert?
-      RETURN_NOT_OK(tls_context->LoadCertificateAuthority(FLAGS_rpc_ca_certificate_file));
-      if (FLAGS_rpc_private_key_password_cmd.empty()) {
-        RETURN_NOT_OK(tls_context->LoadCertificateAndKey(FLAGS_rpc_certificate_file,
-                                                         FLAGS_rpc_private_key_file));
+      RETURN_NOT_OK(tls_context->LoadCertificateAuthority(rpc_ca_certificate_file_));
+      if (rpc_private_key_password_cmd_.empty()) {
+        RETURN_NOT_OK(tls_context->LoadCertificateAndKey(rpc_certificate_file_,
+                                                         rpc_private_key_file_));
       } else {
         RETURN_NOT_OK(tls_context->LoadCertificateAndPasswordProtectedKey(
-            FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file,
+            rpc_certificate_file_, rpc_private_key_file_,
             [&](){
               string ret;
               WARN_NOT_OK(security::GetPasswordFromShellCommand(
-                  FLAGS_rpc_private_key_password_cmd, &ret),
+                  rpc_private_key_password_cmd_, &ret),
                   "could not get RPC password from configured command");
               return ret;
             }
@@ -401,7 +304,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr,
   // Before listening, if we expect to require Kerberos, we want to verify
   // that everything is set up correctly. This way we'll generate errors on
   // startup rather than later on when we first receive a client connection.
-  if (!FLAGS_keytab_file.empty()) {
+  if (!keytab_file_.empty()) {
     RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(sasl_proto_name()),
                           "GSSAPI/Kerberos not properly configured");
   }
@@ -493,11 +396,13 @@ Messenger::Messenger(const MessengerBuilder &bld)
     closing_(false),
     authentication_(RpcAuthentication::REQUIRED),
     encryption_(RpcEncryption::REQUIRED),
-    tls_context_(new security::TlsContext()),
+    tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, bld.rpc_tls_min_protocol_)),
     token_verifier_(new security::TokenVerifier()),
     rpcz_store_(new RpczStore()),
     metric_entity_(bld.metric_entity_),
+    rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
     sasl_proto_name_(bld.sasl_proto_name_),
+    keytab_file_(bld.keytab_file_),
     retain_self_(this) {
   for (int i = 0; i < bld.num_reactors_; i++) {
     reactors_.push_back(new Reactor(retain_self_, i, bld));

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 7f66ab2..6a0581b 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -31,6 +31,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/connection.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
@@ -55,6 +56,9 @@ class TokenVerifier;
 
 namespace rpc {
 
+using security::RpcAuthentication;
+using security::RpcEncryption;
+
 class AcceptorPool;
 class DumpRunningRpcsRequestPB;
 class DumpRunningRpcsResponsePB;
@@ -78,20 +82,6 @@ struct AcceptorPoolInfo {
   Sockaddr bind_address_;
 };
 
-// Authentication configuration for RPC connections.
-enum class RpcAuthentication {
-  DISABLED,
-  OPTIONAL,
-  REQUIRED,
-};
-
-// Encryption configuration for RPC connections.
-enum class RpcEncryption {
-  DISABLED,
-  OPTIONAL,
-  REQUIRED,
-};
-
 // Used to construct a Messenger.
 class MessengerBuilder {
  public:
@@ -121,8 +111,54 @@ class MessengerBuilder {
   // Set metric entity for use by RPC systems.
   MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity);
 
+  // Set the time in milliseconds after which an idle connection from a client will be
+  // disconnected by the server.
+  MessengerBuilder &set_connection_keep_alive_time(int32_t time_in_ms);
+
+  // Set the timeout for negotiating an RPC connection.
+  MessengerBuilder &set_rpc_negotiation_timeout_ms(int64_t time_in_ms);
+
   // Set the SASL protocol name that is used for the SASL negotiation.
-  MessengerBuilder &set_sasl_proto_name(std::string sasl_proto_name);
+  MessengerBuilder &set_sasl_proto_name(const std::string& sasl_proto_name);
+
+  // Set the state of authentication required. If 'optional', authentication will be used when
+  // the remote end supports it. If 'required', connections which are not able to authenticate
+  // (because the remote end lacks support) are rejected.
+  MessengerBuilder &set_rpc_authentication(const std::string& rpc_authentication);
+
+  // Set the state of encryption required. If 'optional', encryption will be used when the
+  // remote end supports it. If 'required', connections which are not able to use encryption
+  // (because the remote end lacks support) are rejected. If 'disabled', encryption will not
+  // be used, and RPC authentication (--rpc_authentication) must also be disabled as well.
+  MessengerBuilder &set_rpc_encryption(const std::string& rpc_encryption);
+
+  // Set the cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+  // cipher preference list format. See man (1) ciphers for more information.
+  MessengerBuilder &set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers);
+
+  // Set the minimum protocol version to allow when for securing RPC connections with TLS. May be
+  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  MessengerBuilder &set_rpc_tls_min_protocol(const std::string& rpc_tls_min_protocol);
+
+  // Set the TLS server certificate and private key files paths. If this is set in conjunction
+  // with enable_inbound_tls(), internal PKI will not be used for encrypted communication and
+  // external PKI will be used instead.
+  MessengerBuilder &set_epki_cert_key_files(
+      const std::string& cert, const std::string& private_key);
+
+  // Set the TLS Certificate Authority file path. Must always be set with set_epki_cert_key_files().
+  // If this is set in conjunction with enable_inbound_tls(), internal PKI will not be used for
+  // encrypted communication and external PKI will be used instead.
+  MessengerBuilder &set_epki_certificate_authority_file(const std::string& ca);
+
+  // Set a Unix command whose output returns the password used to decrypt the RPC server's private
+  // key file specified via set_epki_cert_key_files(). If the .PEM key file is not
+  // password-protected, this flag does not need to be set. Trailing whitespace will be trimmed
+  // before it is used to decrypt the private key.
+  MessengerBuilder &set_epki_private_password_key_cmd(const std::string& cmd);
+
+  // Set the path to the Kerberos Keytab file for this server.
+  MessengerBuilder &set_keytab_file(const std::string& keytab_file);
 
   // Configure the messenger to enable TLS encryption on inbound connections.
   MessengerBuilder& enable_inbound_tls();
@@ -137,7 +173,17 @@ class MessengerBuilder {
   int max_negotiation_threads_;
   MonoDelta coarse_timer_granularity_;
   scoped_refptr<MetricEntity> metric_entity_;
+  int64_t rpc_negotiation_timeout_ms_;
   std::string sasl_proto_name_;
+  std::string rpc_authentication_;
+  std::string rpc_encryption_;
+  std::string rpc_tls_ciphers_;
+  std::string rpc_tls_min_protocol_;
+  std::string rpc_certificate_file_;
+  std::string rpc_private_key_file_;
+  std::string rpc_ca_certificate_file_;
+  std::string rpc_private_key_password_cmd_;
+  std::string keytab_file_;
   bool enable_inbound_tls_;
 };
 
@@ -264,10 +310,14 @@ class Messenger {
 
   scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_.get(); }
 
+  const int64_t rpc_negotiation_timeout_ms() const { return rpc_negotiation_timeout_ms_; }
+
   const std::string& sasl_proto_name() const {
     return sasl_proto_name_;
   }
 
+  const std::string& keytab_file() const { return keytab_file_; }
+
   const scoped_refptr<RpcService> rpc_service(const std::string& service_name) const;
 
  private:
@@ -338,9 +388,15 @@ class Messenger {
 
   scoped_refptr<MetricEntity> metric_entity_;
 
+  // Timeout in milliseconds after which an incomplete connection negotiation will timeout.
+  const int64_t rpc_negotiation_timeout_ms_;
+
   // The SASL protocol name that is used for the SASL negotiation.
   const std::string sasl_proto_name_;
 
+  // Path to the Kerberos Keytab file for this server.
+  const std::string keytab_file_;
+
   // The ownership of the Messenger object is somewhat subtle. The pointer graph
   // looks like this:
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index 92f5e1d..d85ec5d 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -29,7 +29,6 @@
 
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/port.h"
@@ -65,9 +64,6 @@ DEFINE_int32(rpc_negotiation_inject_delay_ms, 0,
              "the RPC negotiation process on the server side.");
 TAG_FLAG(rpc_negotiation_inject_delay_ms, unsafe);
 
-DECLARE_string(keytab_file);
-DECLARE_string(rpc_certificate_file);
-
 DEFINE_bool(rpc_encrypt_loopback_connections, false,
             "Whether to encrypt data transfer on RPC connections that stay within "
             "a single host. Encryption here is likely to offer no additional "
@@ -238,9 +234,10 @@ static Status DoServerNegotiation(Connection* conn,
                                   RpcAuthentication authentication,
                                   RpcEncryption encryption,
                                   const MonoTime& deadline) {
+  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   if (authentication == RpcAuthentication::REQUIRED &&
-      FLAGS_keytab_file.empty() &&
-      FLAGS_rpc_certificate_file.empty()) {
+      messenger->keytab_file().empty() &&
+      !messenger->tls_context().is_external_cert()) {
     return Status::InvalidArgument("RPC authentication (--rpc_authentication) may not be "
                                    "required unless Kerberos (--keytab_file) or external PKI "
                                    "(--rpc_certificate_file et al) are configured");
@@ -253,14 +250,13 @@ static Status DoServerNegotiation(Connection* conn,
   }
 
   // Create a new ServerNegotiation to handle the synchronous negotiation.
-  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   ServerNegotiation server_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
                                        &messenger->token_verifier(),
                                        encryption,
                                        messenger->sasl_proto_name());
 
-  if (authentication != RpcAuthentication::DISABLED && !FLAGS_keytab_file.empty()) {
+  if (authentication != RpcAuthentication::DISABLED && !messenger->keytab_file().empty()) {
     RETURN_NOT_OK(server_negotiation.EnableGSSAPI());
   }
   if (authentication != RpcAuthentication::REQUIRED) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.h b/src/kudu/rpc/negotiation.h
index fb08d6d..b25ed0e 100644
--- a/src/kudu/rpc/negotiation.h
+++ b/src/kudu/rpc/negotiation.h
@@ -21,6 +21,7 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/security/security_flags.h"
 
 namespace kudu {
 
@@ -29,8 +30,6 @@ class MonoTime;
 namespace rpc {
 
 class Connection;
-enum class RpcAuthentication;
-enum class RpcEncryption;
 
 enum class AuthenticationType {
   INVALID,
@@ -47,8 +46,8 @@ class Negotiation {
 
   // Perform negotiation for a connection (either server or client)
   static void RunNegotiation(const scoped_refptr<Connection>& conn,
-                             RpcAuthentication authentication,
-                             RpcEncryption encryption,
+                             security::RpcAuthentication authentication,
+                             security::RpcEncryption encryption,
                              MonoTime deadline);
  private:
   DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 617d23d..7f13cb8 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -69,11 +69,6 @@ using std::shared_ptr;
 using std::unique_ptr;
 using strings::Substitute;
 
-DEFINE_int64(rpc_negotiation_timeout_ms, 3000,
-             "Timeout for negotiating an RPC connection.");
-TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
-TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
-
 DEFINE_bool(rpc_reopen_outbound_connections, false,
             "Open a new connection to the server for every RPC call. "
             "If not enabled, an already existing connection to a "
@@ -572,7 +567,7 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
 
   // Set a limit on how long the server will negotiate with a new client.
   MonoTime deadline = MonoTime::Now() +
-      MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms);
+      MonoDelta::FromMilliseconds(reactor()->messenger()->rpc_negotiation_timeout_ms());
 
   scoped_refptr<Trace> trace(new Trace());
   ADOPT_TRACE(trace.get());

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 41afff9..53ba71d 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -430,10 +430,17 @@ class RpcTestBase : public KuduTest {
  protected:
   std::shared_ptr<Messenger> CreateMessenger(const std::string &name,
                                              int n_reactors = 1,
-                                             bool enable_ssl = false) {
+                                             bool enable_ssl = false,
+                                             const std::string& rpc_certificate_file = "",
+                                             const std::string& rpc_private_key_file = "",
+                                             const std::string& rpc_ca_certificate_file = "",
+                                             const std::string& rpc_private_key_password_cmd = "") {
     MessengerBuilder bld(name);
 
     if (enable_ssl) {
+      bld.set_epki_cert_key_files(rpc_certificate_file, rpc_private_key_file);
+      bld.set_epki_certificate_authority_file(rpc_ca_certificate_file);
+      bld.set_epki_private_password_key_cmd(rpc_private_key_password_cmd);
       bld.enable_inbound_tls();
     }
 
@@ -554,8 +561,14 @@ class RpcTestBase : public KuduTest {
     LOG(INFO) << "status: " << s.ToString() << ", seconds elapsed: " << sw.elapsed().wall_seconds();
   }
 
-  void StartTestServer(Sockaddr *server_addr, bool enable_ssl = false) {
-    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl);
+  void StartTestServer(Sockaddr *server_addr,
+                       bool enable_ssl = false,
+                       const std::string& rpc_certificate_file = "",
+                       const std::string& rpc_private_key_file = "",
+                       const std::string& rpc_ca_certificate_file = "",
+                       const std::string& rpc_private_key_password_cmd = "") {
+    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, rpc_certificate_file,
+        rpc_private_key_file, rpc_ca_certificate_file, rpc_private_key_password_cmd);
   }
 
   void StartTestServerWithGeneratedCode(Sockaddr *server_addr, bool enable_ssl = false) {
@@ -564,7 +577,7 @@ class RpcTestBase : public KuduTest {
 
   void StartTestServerWithCustomMessenger(Sockaddr *server_addr,
       const std::shared_ptr<Messenger>& messenger, bool enable_ssl = false) {
-    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, messenger);
+    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, "", "", "", "", messenger);
   }
 
   // Start a simple socket listening on a local port, returning the address.
@@ -590,11 +603,17 @@ class RpcTestBase : public KuduTest {
   }
 
   template<class ServiceClass>
-  void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false,
-      const std::shared_ptr<Messenger>& messenger = nullptr) {
+  void DoStartTestServer(Sockaddr *server_addr,
+                         bool enable_ssl = false,
+                         const std::string& rpc_certificate_file = "",
+                         const std::string& rpc_private_key_file = "",
+                         const std::string& rpc_ca_certificate_file = "",
+                         const std::string& rpc_private_key_password_cmd = "",
+                         const std::shared_ptr<Messenger>& messenger = nullptr) {
     if (!messenger) {
       server_messenger_ =
-          CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl);
+          CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl, rpc_certificate_file,
+              rpc_private_key_file, rpc_ca_certificate_file, rpc_private_key_password_cmd);
     } else {
       server_messenger_ = messenger;
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index aa92c13..d3360f8 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -71,10 +71,6 @@ METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
 DECLARE_bool(rpc_reopen_outbound_connections);
 DECLARE_int32(rpc_negotiation_inject_delay_ms);
-DECLARE_string(rpc_certificate_file);
-DECLARE_string(rpc_ca_certificate_file);
-DECLARE_string(rpc_private_key_file);
-DECLARE_string(rpc_private_key_password_cmd);
 
 using std::shared_ptr;
 using std::string;
@@ -189,17 +185,22 @@ TEST_P(TestRpc, TestCallWithChainCerts) {
   // We're only interested in running this test with TLS enabled.
   if (!enable_ssl) return;
 
+  string rpc_certificate_file;
+  string rpc_private_key_file;
+  string rpc_ca_certificate_file;
   ASSERT_OK(security::CreateTestSSLCertSignedByChain(GetTestDataDirectory(),
-                                                     &FLAGS_rpc_certificate_file,
-                                                     &FLAGS_rpc_private_key_file,
-                                                     &FLAGS_rpc_ca_certificate_file));
+                                                     &rpc_certificate_file,
+                                                     &rpc_private_key_file,
+                                                     &rpc_ca_certificate_file));
   // Set up server.
   Sockaddr server_addr;
   StartTestServer(&server_addr, enable_ssl);
 
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
-  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl,
+      rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file));
+
   Proxy p(client_messenger, server_addr, server_addr.host(),
           GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
@@ -216,20 +217,26 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
   // We're only interested in running this test with TLS enabled.
   if (!enable_ssl) return;
 
+  string rpc_certificate_file;
+  string rpc_private_key_file;
+  string rpc_ca_certificate_file;
+  string rpc_private_key_password_cmd;
   string passwd;
   ASSERT_OK(security::CreateTestSSLCertWithEncryptedKey(GetTestDataDirectory(),
-                                                       &FLAGS_rpc_certificate_file,
-                                                       &FLAGS_rpc_private_key_file,
+                                                       &rpc_certificate_file,
+                                                       &rpc_private_key_file,
                                                        &passwd));
-  FLAGS_rpc_ca_certificate_file = FLAGS_rpc_certificate_file;
-  FLAGS_rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
+  rpc_ca_certificate_file = rpc_certificate_file;
+  rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
   // Set up server.
   Sockaddr server_addr;
   StartTestServer(&server_addr, enable_ssl);
 
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
-  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl,
+      rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file,
+      rpc_private_key_password_cmd));
   Proxy p(client_messenger, server_addr, server_addr.host(),
           GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
@@ -247,18 +254,23 @@ TEST_P(TestRpc, TestCallWithBadPasswordProtectedKey) {
   if (!enable_ssl) return;
   ::testing::FLAGS_gtest_death_test_style = "threadsafe";
 
+  string rpc_certificate_file;
+  string rpc_private_key_file;
+  string rpc_ca_certificate_file;
+  string rpc_private_key_password_cmd;
   string passwd;
   CHECK_OK(security::CreateTestSSLCertWithEncryptedKey(GetTestDataDirectory(),
-                                                       &FLAGS_rpc_certificate_file,
-                                                       &FLAGS_rpc_private_key_file,
+                                                       &rpc_certificate_file,
+                                                       &rpc_private_key_file,
                                                        &passwd));
   // Overwrite the password with an invalid one.
   passwd = "badpassword";
-  FLAGS_rpc_ca_certificate_file = FLAGS_rpc_certificate_file;
-  FLAGS_rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
+  rpc_ca_certificate_file = rpc_certificate_file;
+  rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
   // Verify that the server fails to start up.
   Sockaddr server_addr;
-  ASSERT_DEATH(StartTestServer(&server_addr, enable_ssl), "failed to load private key file");
+  ASSERT_DEATH(StartTestServer(&server_addr, enable_ssl, rpc_certificate_file, rpc_private_key_file,
+      rpc_ca_certificate_file, rpc_private_key_password_cmd), "failed to load private key file");
 }
 
 // Test that connecting to an invalid server properly throws an error.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/sasl_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.cc b/src/kudu/rpc/sasl_common.cc
index a2da58c..a377d16 100644
--- a/src/kudu/rpc/sasl_common.cc
+++ b/src/kudu/rpc/sasl_common.cc
@@ -26,14 +26,12 @@
 #include <string>
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <regex.h>
 #include <sasl/sasl.h>
 #include <sasl/saslplug.h>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/once.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/security/init.h"
 #include "kudu/util/mutex.h"
@@ -43,8 +41,6 @@
 using std::set;
 using std::string;
 
-DECLARE_string(keytab_file);
-
 namespace kudu {
 namespace rpc {
 
@@ -62,6 +58,9 @@ static bool sasl_is_initialized = false;
 // If true, then we expect someone else has initialized SASL.
 static bool g_disable_sasl_init = false;
 
+// If true, we expect kerberos to be enabled.
+static bool has_kerberos_keytab = false;
+
 // Output Sasl messages.
 // context: not used.
 // level: logging level.
@@ -211,9 +210,11 @@ static bool SaslMutexImplementationProvided() {
 
 // Actually perform the initialization for the SASL subsystem.
 // Meant to be called via GoogleOnceInit().
-static void DoSaslInit() {
+static void DoSaslInit(bool kerberos_keytab_provided) {
   VLOG(3) << "Initializing SASL library";
 
+  has_kerberos_keytab = kerberos_keytab_provided;
+
   bool sasl_initialized = SaslIsInitialized();
   if (sasl_initialized && !g_disable_sasl_init) {
     LOG(WARNING) << "SASL was initialized prior to Kudu's initialization. Skipping "
@@ -264,10 +265,12 @@ Status DisableSaslInitialization() {
   return Status::OK();
 }
 
-Status SaslInit() {
+Status SaslInit(bool kerberos_keytab_provided) {
   // Only execute SASL initialization once
-  static GoogleOnceType once = GOOGLE_ONCE_INIT;
-  GoogleOnceInit(&once, &DoSaslInit);
+  static std::once_flag once;
+  std::call_once(once, DoSaslInit, kerberos_keytab_provided);
+  DCHECK_EQ(kerberos_keytab_provided, has_kerberos_keytab);
+
   return sasl_init_status;
 }
 
@@ -319,10 +322,9 @@ Status WrapSaslCall(sasl_conn_t* conn, const std::function<int()>& call) {
   g_auth_failure_capture = &err;
 
   // Take the 'kerberos_reinit_lock' here to avoid a possible race with ticket renewal.
-  bool kerberos_supported = !FLAGS_keytab_file.empty();
-  if (kerberos_supported) kudu::security::KerberosReinitLock()->ReadLock();
+  if (has_kerberos_keytab) kudu::security::KerberosReinitLock()->ReadLock();
   int rc = call();
-  if (kerberos_supported) kudu::security::KerberosReinitLock()->ReadUnlock();
+  if (has_kerberos_keytab) kudu::security::KerberosReinitLock()->ReadUnlock();
   g_auth_failure_capture = nullptr;
 
   switch (rc) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/sasl_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_common.h b/src/kudu/rpc/sasl_common.h
index 3ec43d2..888e7cb 100644
--- a/src/kudu/rpc/sasl_common.h
+++ b/src/kudu/rpc/sasl_common.h
@@ -63,7 +63,7 @@ struct SaslMechanism {
 //
 // This function is thread safe and uses a static lock.
 // This function should NOT be called during static initialization.
-Status SaslInit() WARN_UNUSED_RESULT;
+Status SaslInit(bool kerberos_keytab_provided = false) WARN_UNUSED_RESULT;
 
 // Disable Kudu's initialization of SASL. See equivalent method in client.h.
 Status DisableSaslInitialization() WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
index 231a6fe..b12b54b 100644
--- a/src/kudu/rpc/server_negotiation.cc
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -376,8 +376,6 @@ Status ServerNegotiation::ValidateConnectionHeader(faststring* recv_buf) {
 
 // calls sasl_server_init() and sasl_server_new()
 Status ServerNegotiation::InitSaslServer() {
-  RETURN_NOT_OK(SaslInit());
-
   // TODO(unknown): Support security flags.
   unsigned secflags = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
index 2255274..2582af1 100644
--- a/src/kudu/rpc/server_negotiation.h
+++ b/src/kudu/rpc/server_negotiation.h
@@ -34,6 +34,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/socket.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/security/CMakeLists.txt b/src/kudu/security/CMakeLists.txt
index b6ae528..509d01b 100644
--- a/src/kudu/security/CMakeLists.txt
+++ b/src/kudu/security/CMakeLists.txt
@@ -62,6 +62,7 @@ set(SECURITY_SRCS
   init.cc
   openssl_util.cc
   ${PORTED_X509_CHECK_HOST_CC}
+  security_flags.cc
   simple_acl.cc
   tls_context.cc
   tls_handshake.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/init.cc b/src/kudu/security/init.cc
index 468f0d0..cd15df3 100644
--- a/src/kudu/security/init.cc
+++ b/src/kudu/security/init.cc
@@ -40,9 +40,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
-#include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/flag_validators.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/rw_mutex.h"
@@ -50,17 +48,6 @@
 #include "kudu/util/status.h"
 #include "kudu/util/thread.h"
 
-DEFINE_string(keytab_file, "",
-              "Path to the Kerberos Keytab file for this server. Specifying a "
-              "keytab file will cause the server to kinit, and enable Kerberos "
-              "to be used to authenticate RPC connections.");
-TAG_FLAG(keytab_file, stable);
-
-DEFINE_bool(allow_world_readable_credentials, false,
-            "Enable the use of keytab files and TLS private keys with "
-            "world-readable permissions.");
-TAG_FLAG(allow_world_readable_credentials, unsafe);
-
 #ifndef __APPLE__
 static constexpr bool kDefaultSystemAuthToLocal = true;
 #else
@@ -91,26 +78,6 @@ namespace {
 
 class KinitContext;
 
-bool ValidateKeytabPermissions() {
-  if (!FLAGS_keytab_file.empty() && !FLAGS_allow_world_readable_credentials) {
-    bool world_readable_keytab;
-    Status s = Env::Default()->IsFileWorldReadable(FLAGS_keytab_file, &world_readable_keytab);
-    if (!s.ok()) {
-      LOG(ERROR) << Substitute("$0: could not verify keytab file does not have world-readable "
-                               "permissions: $1", FLAGS_keytab_file, s.ToString());
-      return false;
-    }
-    if (world_readable_keytab) {
-      LOG(ERROR) << "cannot use keytab file with world-readable permissions: "
-                 << FLAGS_keytab_file;
-      return false;
-    }
-  }
-
-  return true;
-}
-GROUP_FLAG_VALIDATOR(keytab_permissions, &ValidateKeytabPermissions);
-
 // Global context for usage of the Krb5 library.
 krb5_context g_krb5_ctx;
 
@@ -465,12 +432,12 @@ boost::optional<string> GetLoggedInUsernameFromKeytab() {
   return g_kinit_ctx->username_str();
 }
 
-Status InitKerberosForServer(const std::string& raw_principal, const std::string& krb5ccname,
-    bool disable_krb5_replay_cache) {
-  if (FLAGS_keytab_file.empty()) return Status::OK();
+Status InitKerberosForServer(const std::string& raw_principal, const std::string& keytab_file,
+    const std::string& krb5ccname, bool disable_krb5_replay_cache) {
+  if (keytab_file.empty()) return Status::OK();
 
   setenv("KRB5CCNAME", krb5ccname.c_str(), 1);
-  setenv("KRB5_KTNAME", FLAGS_keytab_file.c_str(), 1);
+  setenv("KRB5_KTNAME", keytab_file.c_str(), 1);
 
   if (disable_krb5_replay_cache) {
     // KUDU-1897: disable the Kerberos replay cache. The KRPC protocol includes a
@@ -484,7 +451,7 @@ Status InitKerberosForServer(const std::string& raw_principal, const std::string
   string configured_principal;
   RETURN_NOT_OK(GetConfiguredPrincipal(raw_principal, &configured_principal));
   RETURN_NOT_OK_PREPEND(g_kinit_ctx->Kinit(
-      FLAGS_keytab_file, configured_principal), "unable to kinit");
+      keytab_file, configured_principal), "unable to kinit");
 
   g_kerberos_reinit_lock = new RWMutex(RWMutex::Priority::PREFER_WRITING);
   scoped_refptr<Thread> reacquire_thread;

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/init.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/init.h b/src/kudu/security/init.h
index dccbcc7..8b1519a 100644
--- a/src/kudu/security/init.h
+++ b/src/kudu/security/init.h
@@ -39,10 +39,13 @@ static const std::string kKrb5CCName = "MEMORY:kudu";
 // the '--keytab_file' command line flag.
 // 'raw_principal' is the principal to Kinit with after calling GetConfiguredPrincipal()
 // on it.
+// 'keytab_file' is the path to the kerberos keytab file. If it's an empty string, kerberos
+// will not be initialized.
 // 'krb5ccname' is passed into the KRB5CCNAME env var.
 // 'disable_krb5_replay_cache' if set to true, disables the kerberos replay cache by setting
 // the KRB5RCACHETYPE env var to "none".
 Status InitKerberosForServer(const std::string& raw_principal,
+                             const std::string& keytab_file,
                              const std::string& krb5ccname = kKrb5CCName,
                              bool disable_krb5_replay_cache = true);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/security_flags.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/security_flags.cc b/src/kudu/security/security_flags.cc
new file mode 100644
index 0000000..acdd662
--- /dev/null
+++ b/src/kudu/security/security_flags.cc
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/security_flags.h"
+
+namespace kudu {
+namespace security {
+
+// This is the "modern compatibility" cipher list of the Mozilla Security
+// Server Side TLS recommendations, accessed Feb. 2017, with the addition of
+// the non ECDH/DH AES cipher suites from the "intermediate compatibility"
+// list. These additional ciphers maintain compatibility with RHEL 6.5 and
+// below. The DH AES ciphers are not included since we are not configured to
+// use DH key agreement.
+const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsCiphers =
+                                   "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
+                                   "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
+                                   "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
+                                   "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
+                                   "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
+                                   "AES256-GCM-SHA384:AES128-GCM-SHA256:"
+                                   "AES256-SHA256:AES128-SHA256:"
+                                   "AES256-SHA:AES128-SHA";
+
+const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsMinVersion = "TLSv1";
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/security_flags.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/security_flags.h b/src/kudu/security/security_flags.h
new file mode 100644
index 0000000..e64536d
--- /dev/null
+++ b/src/kudu/security/security_flags.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/util/flags.h"
+
+namespace kudu {
+namespace security {
+
+// Authentication configuration for RPC connections.
+typedef TriStateFlag RpcAuthentication;
+
+// Encryption configuration for RPC connections.
+typedef TriStateFlag RpcEncryption;
+
+struct SecurityDefaults {
+  static const char* const kDefaultTlsCiphers;
+  static const char* const kDefaultTlsMinVersion;
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/test/mini_kdc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc-test.cc b/src/kudu/security/test/mini_kdc-test.cc
index d8efa81..e0ba455 100644
--- a/src/kudu/security/test/mini_kdc-test.cc
+++ b/src/kudu/security/test/mini_kdc-test.cc
@@ -18,7 +18,6 @@
 #include <string>
 
 #include <boost/optional/optional.hpp>
-#include <gflags/gflags_declare.h>
 #include <gtest/gtest.h>
 
 #include "kudu/security/init.h"
@@ -29,9 +28,6 @@
 
 using std::string;
 
-DECLARE_string(keytab_file);
-DECLARE_string(principal);
-
 namespace kudu {
 
 class MiniKdcTest : public KuduTest {};
@@ -76,8 +72,7 @@ TEST_F(MiniKdcTest, TestBasicOperation) {
 
   // Test programmatic keytab login.
   kdc.SetKrb5Environment();
-  FLAGS_keytab_file = kt_path;
-  ASSERT_OK(security::InitKerberosForServer(kSPN));
+  ASSERT_OK(security::InitKerberosForServer(kSPN, kt_path));
   ASSERT_EQ("kudu/foo.example.com@KRBTEST.COM", *security::GetLoggedInPrincipalFromKeytab());
 
   // Test principal canonicalization.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_context.cc b/src/kudu/security/tls_context.cc
index f708ac7..9321862 100644
--- a/src/kudu/security/tls_context.cc
+++ b/src/kudu/security/tls_context.cc
@@ -39,6 +39,7 @@
 #include "kudu/security/crypto.h"
 #include "kudu/security/init.h"
 #include "kudu/security/openssl_util.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/net/net_util.h"
@@ -56,32 +57,6 @@ DEFINE_int32(ipki_server_key_size, 2048,
              "is used for TLS connections to and from clients and other servers.");
 TAG_FLAG(ipki_server_key_size, experimental);
 
-DEFINE_string(rpc_tls_ciphers,
-              // This is the "modern compatibility" cipher list of the Mozilla Security
-              // Server Side TLS recommendations, accessed Feb. 2017, with the addition of
-              // the non ECDH/DH AES cipher suites from the "intermediate compatibility"
-              // list. These additional ciphers maintain compatibility with RHEL 6.5 and
-              // below. The DH AES ciphers are not included since we are not configured to
-              // use DH key agreement.
-              "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
-              "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
-              "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
-              "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
-              "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
-              "AES256-GCM-SHA384:AES128-GCM-SHA256:"
-              "AES256-SHA256:AES128-SHA256:"
-              "AES256-SHA:AES128-SHA",
-              "The cipher suite preferences to use for TLS-secured RPC connections. "
-              "Uses the OpenSSL cipher preference list format. See man (1) ciphers "
-              "for more information.");
-TAG_FLAG(rpc_tls_ciphers, advanced);
-
-DEFINE_string(rpc_tls_min_protocol, "TLSv1",
-              "The minimum protocol version to allow when for securing RPC "
-              "connections with TLS. May be one of 'TLSv1', 'TLSv1.1', or "
-              "'TLSv1.2'.");
-TAG_FLAG(rpc_tls_min_protocol, advanced);
-
 namespace kudu {
 namespace security {
 
@@ -95,7 +70,19 @@ template<> struct SslTypeTraits<X509_STORE_CTX> {
 };
 
 TlsContext::TlsContext()
-    : lock_(RWMutex::Priority::PREFER_READING),
+    : tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
+      lock_(RWMutex::Priority::PREFER_READING),
+      trusted_cert_count_(0),
+      has_cert_(false),
+      is_external_cert_(false) {
+  security::InitializeOpenSSL();
+}
+
+TlsContext::TlsContext(std::string tls_ciphers, std::string tls_min_protocol)
+    : tls_ciphers_(std::move(tls_ciphers)),
+      tls_min_protocol_(std::move(tls_min_protocol)),
+      lock_(RWMutex::Priority::PREFER_READING),
       trusted_cert_count_(0),
       has_cert_(false),
       is_external_cert_(false) {
@@ -126,7 +113,7 @@ Status TlsContext::Init() {
   //   https://tools.ietf.org/html/rfc7525#section-3.3
   auto options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION;
 
-  if (boost::iequals(FLAGS_rpc_tls_min_protocol, "TLSv1.2")) {
+  if (boost::iequals(tls_min_protocol_, "TLSv1.2")) {
 #if OPENSSL_VERSION_NUMBER < 0x10001000L
     return Status::InvalidArgument(
         "--rpc_tls_min_protocol=TLSv1.2 is not be supported on this platform. "
@@ -134,7 +121,7 @@ Status TlsContext::Init() {
 #else
     options |= SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1;
 #endif
-  } else if (boost::iequals(FLAGS_rpc_tls_min_protocol, "TLSv1.1")) {
+  } else if (boost::iequals(tls_min_protocol_, "TLSv1.1")) {
 #if OPENSSL_VERSION_NUMBER < 0x10001000L
     return Status::InvalidArgument(
         "--rpc_tls_min_protocol=TLSv1.1 is not be supported on this platform. "
@@ -142,15 +129,15 @@ Status TlsContext::Init() {
 #else
     options |= SSL_OP_NO_TLSv1;
 #endif
-  } else if (!boost::iequals(FLAGS_rpc_tls_min_protocol, "TLSv1")) {
+  } else if (!boost::iequals(tls_min_protocol_, "TLSv1")) {
     return Status::InvalidArgument("unknown value provided for --rpc_tls_min_protocol flag",
-                                   FLAGS_rpc_tls_min_protocol);
+                                   tls_min_protocol_);
   }
 
   SSL_CTX_set_options(ctx_.get(), options);
 
   OPENSSL_RET_NOT_OK(
-      SSL_CTX_set_cipher_list(ctx_.get(), FLAGS_rpc_tls_ciphers.c_str()),
+      SSL_CTX_set_cipher_list(ctx_.get(), tls_ciphers_.c_str()),
       "failed to set TLS ciphers");
 
   // Enable ECDH curves. For OpenSSL 1.1.0 and up, this is done automatically.

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/security/tls_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/security/tls_context.h b/src/kudu/security/tls_context.h
index 33ca403..4a4dba1 100644
--- a/src/kudu/security/tls_context.h
+++ b/src/kudu/security/tls_context.h
@@ -71,6 +71,8 @@ class TlsContext {
 
   TlsContext();
 
+  TlsContext(std::string tls_ciphers, std::string tls_min_protocol);
+
   ~TlsContext() = default;
 
   Status Init() WARN_UNUSED_RESULT;
@@ -174,6 +176,14 @@ class TlsContext {
 
   Status VerifyCertChainUnlocked(const Cert& cert) WARN_UNUSED_RESULT;
 
+  // The cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+  // cipher preference list format. See man (1) ciphers for more information.
+  std::string tls_ciphers_;
+
+  // The minimum protocol version to allow when for securing RPC connections with TLS. May be
+  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  std::string tls_min_protocol_;
+
   // Protects all members.
   //
   // Taken in write mode when any changes are modifying the underlying SSL_CTX

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/server/server_base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 668eeb5..269c098 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -49,6 +49,7 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/security/init.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/server/default_path_handlers.h"
 #include "kudu/server/generic_service.h"
 #include "kudu/server/glog_metrics.h"
@@ -62,6 +63,8 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
+#include "kudu/util/flags.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/mem_tracker.h"
@@ -87,6 +90,11 @@ TAG_FLAG(min_negotiation_threads, advanced);
 DEFINE_int32(max_negotiation_threads, 50, "Maximum number of connection negotiation threads.");
 TAG_FLAG(max_negotiation_threads, advanced);
 
+DEFINE_int64(rpc_negotiation_timeout_ms, 3000,
+             "Timeout for negotiating an RPC connection.");
+TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
+TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
+
 DEFINE_bool(webserver_enabled, true, "Whether to enable the web server on this daemon. "
             "NOTE: disabling the web server is also likely to prevent monitoring systems "
             "from properly capturing metrics.");
@@ -119,8 +127,85 @@ TAG_FLAG(principal, experimental);
 // See KUDU-1884.
 TAG_FLAG(principal, unsafe);
 
+
+DEFINE_string(keytab_file, "",
+              "Path to the Kerberos Keytab file for this server. Specifying a "
+              "keytab file will cause the server to kinit, and enable Kerberos "
+              "to be used to authenticate RPC connections.");
+TAG_FLAG(keytab_file, stable);
+
+DEFINE_bool(allow_world_readable_credentials, false,
+            "Enable the use of keytab files and TLS private keys with "
+            "world-readable permissions.");
+TAG_FLAG(allow_world_readable_credentials, unsafe);
+
+DEFINE_string(rpc_authentication, "optional",
+              "Whether to require RPC connections to authenticate. Must be one "
+              "of 'disabled', 'optional', or 'required'. If 'optional', "
+              "authentication will be used when the remote end supports it. If "
+              "'required', connections which are not able to authenticate "
+              "(because the remote end lacks support) are rejected. Secure "
+              "clusters should use 'required'.");
+DEFINE_string(rpc_encryption, "optional",
+              "Whether to require RPC connections to be encrypted. Must be one "
+              "of 'disabled', 'optional', or 'required'. If 'optional', "
+              "encryption will be used when the remote end supports it. If "
+              "'required', connections which are not able to use encryption "
+              "(because the remote end lacks support) are rejected. If 'disabled', "
+              "encryption will not be used, and RPC authentication "
+              "(--rpc_authentication) must also be disabled as well. "
+              "Secure clusters should use 'required'.");
+TAG_FLAG(rpc_authentication, evolving);
+TAG_FLAG(rpc_encryption, evolving);
+
+DEFINE_string(rpc_tls_ciphers,
+              kudu::security::SecurityDefaults::kDefaultTlsCiphers,
+              "The cipher suite preferences to use for TLS-secured RPC connections. "
+              "Uses the OpenSSL cipher preference list format. See man (1) ciphers "
+              "for more information.");
+TAG_FLAG(rpc_tls_ciphers, advanced);
+
+DEFINE_string(rpc_tls_min_protocol,
+              kudu::security::SecurityDefaults::kDefaultTlsMinVersion,
+              "The minimum protocol version to allow when for securing RPC "
+              "connections with TLS. May be one of 'TLSv1', 'TLSv1.1', or "
+              "'TLSv1.2'.");
+TAG_FLAG(rpc_tls_min_protocol, advanced);
+
+DEFINE_string(rpc_certificate_file, "",
+              "Path to a PEM encoded X509 certificate to use for securing RPC "
+              "connections with SSL/TLS. If set, '--rpc_private_key_file' and "
+              "'--rpc_ca_certificate_file' must be set as well.");
+DEFINE_string(rpc_private_key_file, "",
+              "Path to a PEM encoded private key paired with the certificate "
+              "from '--rpc_certificate_file'");
+DEFINE_string(rpc_ca_certificate_file, "",
+              "Path to the PEM encoded X509 certificate of the trusted external "
+              "certificate authority. The provided certificate should be the root "
+              "issuer of the certificate passed in '--rpc_certificate_file'.");
+DEFINE_string(rpc_private_key_password_cmd, "", "A Unix command whose output "
+              "returns the password used to decrypt the RPC server's private key "
+              "file specified in --rpc_private_key_file. If the .PEM key file is "
+              "not password-protected, this flag does not need to be set. "
+              "Trailing whitespace will be trimmed before it is used to decrypt "
+              "the private key.");
+
+// Setting TLS certs and keys via CLI flags is only necessary for external
+// PKI-based security, which is not yet production ready. Instead, see
+// internal PKI (ipki) and Kerberos-based authentication.
+TAG_FLAG(rpc_certificate_file, experimental);
+TAG_FLAG(rpc_private_key_file, experimental);
+TAG_FLAG(rpc_ca_certificate_file, experimental);
+
+DEFINE_int32(rpc_default_keepalive_time_ms, 65000,
+             "If an RPC connection from a client is idle for this amount of time, the server "
+             "will disconnect the client.");
+TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
+
 DECLARE_bool(use_hybrid_clock);
 
+using kudu::security::RpcAuthentication;
+using kudu::security::RpcEncryption;
 using std::ostringstream;
 using std::shared_ptr;
 using std::string;
@@ -135,6 +220,111 @@ namespace server {
 
 namespace {
 
+bool ValidateKeytabPermissions() {
+  if (!FLAGS_keytab_file.empty() && !FLAGS_allow_world_readable_credentials) {
+    bool world_readable_keytab;
+    Status s = Env::Default()->IsFileWorldReadable(FLAGS_keytab_file, &world_readable_keytab);
+    if (!s.ok()) {
+      LOG(ERROR) << Substitute("$0: could not verify keytab file does not have world-readable "
+                               "permissions: $1", FLAGS_keytab_file, s.ToString());
+      return false;
+    }
+    if (world_readable_keytab) {
+      LOG(ERROR) << "cannot use keytab file with world-readable permissions: "
+                 << FLAGS_keytab_file;
+      return false;
+    }
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(keytab_permissions, &ValidateKeytabPermissions);
+
+} // namespace
+
+static bool ValidateRpcAuthentication(const char* flag_name, const string& flag_value) {
+  security::RpcAuthentication result;
+  Status s = ParseTriState(flag_name, flag_value, &result);
+  if (!s.ok()) {
+    LOG(ERROR) << s.message().ToString();
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(rpc_authentication, &ValidateRpcAuthentication);
+
+static bool ValidateRpcEncryption(const char* flag_name, const string& flag_value) {
+  security::RpcEncryption result;
+  Status s = ParseTriState(flag_name, flag_value, &result);
+  if (!s.ok()) {
+    LOG(ERROR) << s.message().ToString();
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(rpc_encryption, &ValidateRpcEncryption);
+
+static bool ValidateRpcAuthnFlags() {
+  security::RpcAuthentication authentication;
+  CHECK_OK(ParseTriState("--rpc_authentication", FLAGS_rpc_authentication, &authentication));
+
+  security::RpcEncryption encryption;
+  CHECK_OK(ParseTriState("--rpc_encryption", FLAGS_rpc_encryption, &encryption));
+
+  if (encryption == RpcEncryption::DISABLED && authentication != RpcAuthentication::DISABLED) {
+    LOG(ERROR) << "RPC authentication (--rpc_authentication) must be disabled "
+                  "if RPC encryption (--rpc_encryption) is disabled";
+    return false;
+  }
+
+  const bool has_keytab = !FLAGS_keytab_file.empty();
+  const bool has_cert = !FLAGS_rpc_certificate_file.empty();
+  if (authentication == RpcAuthentication::REQUIRED && !has_keytab && !has_cert) {
+    LOG(ERROR) << "RPC authentication (--rpc_authentication) may not be "
+                  "required unless Kerberos (--keytab_file) or external PKI "
+                  "(--rpc_certificate_file et al) are configured";
+    return false;
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(rpc_authn_flags, ValidateRpcAuthnFlags);
+
+static bool ValidateExternalPkiFlags() {
+  bool has_cert = !FLAGS_rpc_certificate_file.empty();
+  bool has_key = !FLAGS_rpc_private_key_file.empty();
+  bool has_ca = !FLAGS_rpc_ca_certificate_file.empty();
+
+  if (has_cert != has_key || has_cert != has_ca) {
+    LOG(ERROR) << "--rpc_certificate_file, --rpc_private_key_file, and "
+                  "--rpc_ca_certificate_file flags must be set as a group; "
+                  "i.e. either set all or none of them.";
+    return false;
+  }
+
+  if (has_key && !FLAGS_allow_world_readable_credentials) {
+    bool world_readable_private_key;
+    Status s = Env::Default()->IsFileWorldReadable(FLAGS_rpc_private_key_file,
+                                                   &world_readable_private_key);
+    if (!s.ok()) {
+      LOG(ERROR) << Substitute("$0: could not verify private key file does not have "
+                               "world-readable permissions: $1",
+                               FLAGS_rpc_private_key_file, s.ToString());
+      return false;
+    }
+    if (world_readable_private_key) {
+      LOG(ERROR) << "cannot use private key file with world-readable permissions: "
+                 << FLAGS_rpc_private_key_file;
+      return false;
+    }
+  }
+
+  return true;
+}
+GROUP_FLAG_VALIDATOR(external_pki_flags, ValidateExternalPkiFlags);
+
+namespace {
+
 // Disambiguates between servers when in a minicluster.
 AtomicInt<int32_t> mem_tracker_id_counter(-1);
 
@@ -231,7 +421,7 @@ Status ServerBase::Init() {
   // if we're having clock problems.
   RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
 
-  RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal));
+  RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
 
   fs::FsReport report;
   Status s = fs_manager_->Open(&report);
@@ -255,7 +445,18 @@ Status ServerBase::Init() {
          .set_min_negotiation_threads(FLAGS_min_negotiation_threads)
          .set_max_negotiation_threads(FLAGS_max_negotiation_threads)
          .set_metric_entity(metric_entity())
+         .set_connection_keep_alive_time(FLAGS_rpc_default_keepalive_time_ms)
+         .set_rpc_negotiation_timeout_ms(FLAGS_rpc_negotiation_timeout_ms)
+         .set_rpc_authentication(FLAGS_rpc_authentication)
+         .set_rpc_encryption(FLAGS_rpc_encryption)
+         .set_rpc_tls_ciphers(FLAGS_rpc_tls_ciphers)
+         .set_rpc_tls_min_protocol(FLAGS_rpc_tls_min_protocol)
+         .set_epki_cert_key_files(FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file)
+         .set_epki_certificate_authority_file(FLAGS_rpc_ca_certificate_file)
+         .set_epki_private_password_key_cmd(FLAGS_rpc_private_key_password_cmd)
+         .set_keytab_file(FLAGS_keytab_file)
          .enable_inbound_tls();
+
   RETURN_NOT_OK(builder.Build(&messenger_));
 
   RETURN_NOT_OK(rpc_server_->Init(messenger_));

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/server/webserver_options.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/webserver_options.cc b/src/kudu/server/webserver_options.cc
index a0bf92c..65bdc56 100644
--- a/src/kudu/server/webserver_options.cc
+++ b/src/kudu/server/webserver_options.cc
@@ -26,6 +26,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flag_validators.h"
 
@@ -97,20 +98,13 @@ TAG_FLAG(webserver_port, stable);
 
 DEFINE_string(webserver_tls_ciphers,
               // See security/tls_context.cc for origin of this list.
-              "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
-              "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
-              "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
-              "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
-              "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
-              "AES256-GCM-SHA384:AES128-GCM-SHA256:"
-              "AES256-SHA256:AES128-SHA256:"
-              "AES256-SHA:AES128-SHA",
+              kudu::security::SecurityDefaults::kDefaultTlsCiphers,
               "The cipher suite preferences to use for webserver HTTPS connections. "
               "Uses the OpenSSL cipher preference list format. See man (1) ciphers "
               "for more information.");
 TAG_FLAG(webserver_tls_ciphers, advanced);
 
-DEFINE_string(webserver_tls_min_protocol, "TLSv1",
+DEFINE_string(webserver_tls_min_protocol, kudu::security::SecurityDefaults::kDefaultTlsMinVersion,
               "The minimum protocol version to allow when for webserver HTTPS "
               "connections. May be one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.");
 TAG_FLAG(webserver_tls_min_protocol, advanced);

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/util/flags.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/flags.cc b/src/kudu/util/flags.cc
index edb232b..2a7398f 100644
--- a/src/kudu/util/flags.cc
+++ b/src/kudu/util/flags.cc
@@ -29,6 +29,7 @@
 
 #include <sys/stat.h>
 
+#include <boost/algorithm/string/predicate.hpp>
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -559,4 +560,20 @@ GFlagsMap GetFlagsMap() {
   return flags_by_name;
 }
 
+Status ParseTriState(const char* flag_name, const std::string& flag_value,
+    TriStateFlag* tri_state) {
+  if (boost::iequals(flag_value, "required")) {
+    *tri_state = TriStateFlag::REQUIRED;
+  } else if (boost::iequals(flag_value, "optional")) {
+    *tri_state = TriStateFlag::OPTIONAL;
+  } else if (boost::iequals(flag_value, "disabled")) {
+    *tri_state = TriStateFlag::DISABLED;
+  } else {
+    return Status::InvalidArgument(strings::Substitute(
+          "$0 flag must be one of 'required', 'optional', or 'disabled'",
+          flag_name));
+  }
+  return Status::OK();
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b357fa9b/src/kudu/util/flags.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/flags.h b/src/kudu/util/flags.h
index 2c5159a..032c3e3 100644
--- a/src/kudu/util/flags.h
+++ b/src/kudu/util/flags.h
@@ -21,6 +21,8 @@
 #include <string>
 #include <unordered_map>
 
+#include "kudu/util/status.h"
+
 namespace google {
   struct CommandLineFlagInfo;
 }
@@ -72,5 +74,14 @@ std::string GetNonDefaultFlags(const GFlagsMap& default_flags);
 
 GFlagsMap GetFlagsMap();
 
+enum class TriStateFlag {
+  DISABLED,
+  OPTIONAL,
+  REQUIRED,
+};
+
+Status ParseTriState(const char* flag_name, const std::string& flag_value,
+    TriStateFlag* tri_state);
+
 } // namespace kudu
 #endif /* KUDU_UTIL_FLAGS_H */