You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/01/03 16:48:11 UTC

impala git commit: KUDU-2228: Make Messenger options configurable

Repository: impala
Updated Branches:
  refs/heads/master 545163bb0 -> 6f2ebadf8


KUDU-2228: Make Messenger options configurable

Currently, the RPC layer accesses many gflags directly to take
certain decisions, eg. whether to turn on encryption,
authentication, etc.

Since the RPC layer is to be used more like a library, these should
be configurable options that are passed to the Messenger
(which is the API endpoint for the application using the RPC layer),
instead of the RPC layer itself directly accessing these flags.

This patch converts the following flags to Messenger options and moves
the flag definitions to server_base.cc which is the "application" in
Kudu that uses the Messenger:

FLAGS_rpc_default_keepalive_time_ms
FLAGS_rpc_negotiation_timeout_ms
FLAGS_rpc_authentication
FLAGS_rpc_encryption
FLAGS_rpc_tls_ciphers
FLAGS_rpc_tls_min_protocol
FLAGS_rpc_certificate_file
FLAGS_rpc_private_key_file
FLAGS_rpc_ca_certificate_file
FLAGS_rpc_private_key_password_cmd
FLAGS_keytab_file

Most of the remaining flags are test or benchmark related flags. There
may be a few more flags that can be moved out and converted to options,
but we can leave that as future work if we decide to move them.

In addition to the cherry-pick above, this change also updates Impala
code to pass the key_tab file to InitKerberosForServer() which was changed
by this Kudu patch.

Change-Id: Ia21814ffb6e283c2791985b089878b579905f0ba
Reviewed-on: http://gerrit.cloudera.org:8080/8789
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/8878
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6f2ebadf
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6f2ebadf
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6f2ebadf

Branch: refs/heads/master
Commit: 6f2ebadf8d119b1486f54b911ba3c7ecc1921d55
Parents: 545163b
Author: Sailesh Mukil <sa...@apache.org>
Authored: Thu Dec 7 03:25:11 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jan 3 11:45:48 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/client_negotiation.cc      |   2 -
 be/src/kudu/rpc/client_negotiation.h       |   5 +-
 be/src/kudu/rpc/messenger.cc               | 237 ++++++++----------------
 be/src/kudu/rpc/messenger.h                |  86 +++++++--
 be/src/kudu/rpc/negotiation.cc             |  11 +-
 be/src/kudu/rpc/negotiation.h              |   7 +-
 be/src/kudu/rpc/reactor.cc                 |   7 +-
 be/src/kudu/rpc/rpc-test-base.h            |  33 +++-
 be/src/kudu/rpc/rpc-test.cc                |  48 +++--
 be/src/kudu/rpc/sasl_common.cc             |  24 +--
 be/src/kudu/rpc/sasl_common.h              |   2 +-
 be/src/kudu/rpc/server_negotiation.cc      |   2 -
 be/src/kudu/rpc/server_negotiation.h       |   5 +-
 be/src/kudu/security/CMakeLists.txt        |   1 +
 be/src/kudu/security/init.cc               |  13 +-
 be/src/kudu/security/init.h                |   3 +
 be/src/kudu/security/security_flags.cc     |  42 +++++
 be/src/kudu/security/security_flags.h      |  36 ++++
 be/src/kudu/security/test/mini_kdc-test.cc |   9 +-
 be/src/kudu/security/tls_context.cc        |  51 ++---
 be/src/kudu/security/tls_context.h         |  10 +
 be/src/kudu/util/flags.cc                  |  17 ++
 be/src/kudu/util/flags.h                   |  11 +-
 be/src/rpc/authentication.cc               |   2 +-
 24 files changed, 381 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.cc b/be/src/kudu/rpc/client_negotiation.cc
index 24ef46f..1431a4d 100644
--- a/be/src/kudu/rpc/client_negotiation.cc
+++ b/be/src/kudu/rpc/client_negotiation.cc
@@ -280,8 +280,6 @@ Status ClientNegotiation::SendConnectionHeader() {
 }
 
 Status ClientNegotiation::InitSaslClient() {
-  RETURN_NOT_OK(SaslInit());
-
   // TODO(KUDU-1922): consider setting SASL_SUCCESS_DATA
   unsigned flags = 0;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.h b/be/src/kudu/rpc/client_negotiation.h
index 6650ccd..4db4a5f 100644
--- a/be/src/kudu/rpc/client_negotiation.h
+++ b/be/src/kudu/rpc/client_negotiation.h
@@ -30,6 +30,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/util/monotime.h"
@@ -61,7 +62,7 @@ class ClientNegotiation {
   ClientNegotiation(std::unique_ptr<Socket> socket,
                     const security::TlsContext* tls_context,
                     const boost::optional<security::SignedTokenPB>& authn_token,
-                    RpcEncryption encryption,
+                    security::RpcEncryption encryption,
                     std::string sasl_proto_name);
 
   // Enable PLAIN authentication.
@@ -220,7 +221,7 @@ class ClientNegotiation {
   // TLS state.
   const security::TlsContext* tls_context_;
   security::TlsHandshake tls_handshake_;
-  const RpcEncryption encryption_;
+  const security::RpcEncryption encryption_;
   bool tls_negotiated_;
 
   // TSK state.

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
index 91ccc15..be18b7d 100644
--- a/be/src/kudu/rpc/messenger.cc
+++ b/be/src/kudu/rpc/messenger.cc
@@ -27,8 +27,6 @@
 #include <set>
 #include <string>
 
-#include <boost/algorithm/string/predicate.hpp>
-#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
@@ -47,9 +45,7 @@
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/tls_context.h"
 #include "kudu/security/token_verifier.h"
-#include "kudu/util/errno.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/flag_validators.h"
+#include "kudu/util/flags.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/socket.h"
@@ -63,152 +59,22 @@ using std::shared_ptr;
 using std::make_shared;
 using strings::Substitute;
 
-DEFINE_string_hidden(rpc_authentication, "optional",
-              "Whether to require RPC connections to authenticate. Must be one "
-              "of 'disabled', 'optional', or 'required'. If 'optional', "
-              "authentication will be used when the remote end supports it. If "
-              "'required', connections which are not able to authenticate "
-              "(because the remote end lacks support) are rejected. Secure "
-              "clusters should use 'required'.");
-DEFINE_string_hidden(rpc_encryption, "optional",
-              "Whether to require RPC connections to be encrypted. Must be one "
-              "of 'disabled', 'optional', or 'required'. If 'optional', "
-              "encryption will be used when the remote end supports it. If "
-              "'required', connections which are not able to use encryption "
-              "(because the remote end lacks support) are rejected. If 'disabled', "
-              "encryption will not be used, and RPC authentication "
-              "(--rpc_authentication) must also be disabled as well. "
-              "Secure clusters should use 'required'.");
-TAG_FLAG(rpc_authentication, evolving);
-TAG_FLAG(rpc_encryption, evolving);
-
-DEFINE_string_hidden(rpc_certificate_file, "",
-              "Path to a PEM encoded X509 certificate to use for securing RPC "
-              "connections with SSL/TLS. If set, '--rpc_private_key_file' and "
-              "'--rpc_ca_certificate_file' must be set as well.");
-DEFINE_string_hidden(rpc_private_key_file, "",
-              "Path to a PEM encoded private key paired with the certificate "
-              "from '--rpc_certificate_file'");
-DEFINE_string_hidden(rpc_ca_certificate_file, "",
-              "Path to the PEM encoded X509 certificate of the trusted external "
-              "certificate authority. The provided certificate should be the root "
-              "issuer of the certificate passed in '--rpc_certificate_file'.");
-DEFINE_string_hidden(rpc_private_key_password_cmd, "", "A Unix command whose output "
-              "returns the password used to decrypt the RPC server's private key "
-              "file specified in --rpc_private_key_file. If the .PEM key file is "
-              "not password-protected, this flag does not need to be set. "
-              "Trailing whitespace will be trimmed before it is used to decrypt "
-              "the private key.");
-
-// Setting TLS certs and keys via CLI flags is only necessary for external
-// PKI-based security, which is not yet production ready. Instead, see
-// internal PKI (ipki) and Kerberos-based authentication.
-TAG_FLAG(rpc_certificate_file, experimental);
-TAG_FLAG(rpc_private_key_file, experimental);
-TAG_FLAG(rpc_ca_certificate_file, experimental);
-
-DEFINE_int32_hidden(rpc_default_keepalive_time_ms, 65000,
-             "If an RPC connection from a client is idle for this amount of time, the server "
-             "will disconnect the client.");
-TAG_FLAG(rpc_default_keepalive_time_ms, advanced);
-
-DECLARE_string(keytab_file);
-
 namespace kudu {
 namespace rpc {
 
-class Messenger;
-class ServerBuilder;
-
-template <typename T>
-static Status ParseTriState(const char* flag_name, const string& flag_value, T* tri_state) {
-  if (boost::iequals(flag_value, "required")) {
-    *tri_state = T::REQUIRED;
-  } else if (boost::iequals(flag_value, "optional")) {
-    *tri_state = T::OPTIONAL;
-  } else if (boost::iequals(flag_value, "disabled")) {
-    *tri_state = T::DISABLED;
-  } else {
-    return Status::InvalidArgument(Substitute(
-          "$0 flag must be one of 'required', 'optional', or 'disabled'",
-          flag_name));
-  }
-  return Status::OK();
-}
-
-static bool ValidateRpcAuthentication(const char* flag_name, const string& flag_value) {
-  RpcAuthentication result;
-  Status s = ParseTriState(flag_name, flag_value, &result);
-  if (!s.ok()) {
-    LOG(ERROR) << s.message().ToString();
-    return false;
-  }
-  return true;
-}
-DEFINE_validator(rpc_authentication, &ValidateRpcAuthentication);
-
-static bool ValidateRpcEncryption(const char* flag_name, const string& flag_value) {
-  RpcEncryption result;
-  Status s = ParseTriState(flag_name, flag_value, &result);
-  if (!s.ok()) {
-    LOG(ERROR) << s.message().ToString();
-    return false;
-  }
-  return true;
-}
-DEFINE_validator(rpc_encryption, &ValidateRpcEncryption);
-
-static bool ValidateRpcAuthnFlags() {
-  RpcAuthentication authentication;
-  CHECK_OK(ParseTriState("--rpc_authentication", FLAGS_rpc_authentication, &authentication));
-
-  RpcEncryption encryption;
-  CHECK_OK(ParseTriState("--rpc_encryption", FLAGS_rpc_encryption, &encryption));
-
-  if (encryption == RpcEncryption::DISABLED && authentication != RpcAuthentication::DISABLED) {
-    LOG(ERROR) << "RPC authentication (--rpc_authentication) must be disabled "
-                  "if RPC encryption (--rpc_encryption) is disabled";
-    return false;
-  }
-
-  const bool has_keytab = !FLAGS_keytab_file.empty();
-  const bool has_cert = !FLAGS_rpc_certificate_file.empty();
-  if (authentication == RpcAuthentication::REQUIRED && !has_keytab && !has_cert) {
-    LOG(ERROR) << "RPC authentication (--rpc_authentication) may not be "
-                  "required unless Kerberos (--keytab_file) or external PKI "
-                  "(--rpc_certificate_file et al) are configured";
-    return false;
-  }
-
-  return true;
-}
-GROUP_FLAG_VALIDATOR(rpc_authn_flags, ValidateRpcAuthnFlags);
-
-static bool ValidateExternalPkiFlags() {
-  bool has_cert = !FLAGS_rpc_certificate_file.empty();
-  bool has_key = !FLAGS_rpc_private_key_file.empty();
-  bool has_ca = !FLAGS_rpc_ca_certificate_file.empty();
-
-  if (has_cert != has_key || has_cert != has_ca) {
-    LOG(ERROR) << "--rpc_certificate_file, --rpc_private_key_file, and "
-                  "--rpc_ca_certificate_file flags must be set as a group; "
-                  "i.e. either set all or none of them.";
-    return false;
-  }
-
-  return true;
-}
-GROUP_FLAG_VALIDATOR(external_pki_flags, ValidateExternalPkiFlags);
-
 MessengerBuilder::MessengerBuilder(std::string name)
     : name_(std::move(name)),
-      connection_keepalive_time_(
-          MonoDelta::FromMilliseconds(FLAGS_rpc_default_keepalive_time_ms)),
+      connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
       num_reactors_(4),
       min_negotiation_threads_(0),
       max_negotiation_threads_(4),
       coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
+      rpc_negotiation_timeout_ms_(3000),
       sasl_proto_name_("kudu"),
+      rpc_authentication_("optional"),
+      rpc_encryption_("optional"),
+      rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
       enable_inbound_tls_(false) {
 }
 
@@ -243,8 +109,61 @@ MessengerBuilder &MessengerBuilder::set_metric_entity(
   return *this;
 }
 
-MessengerBuilder &MessengerBuilder::set_sasl_proto_name(std::string sasl_proto_name) {
-  sasl_proto_name_ = std::move(sasl_proto_name);
+MessengerBuilder &MessengerBuilder::set_connection_keep_alive_time(int32_t time_in_ms) {
+  connection_keepalive_time_ = MonoDelta::FromMilliseconds(time_in_ms);
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_negotiation_timeout_ms(int64_t time_in_ms) {
+  rpc_negotiation_timeout_ms_ = time_in_ms;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_sasl_proto_name(const std::string& sasl_proto_name) {
+  sasl_proto_name_ = sasl_proto_name;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_authentication(const std::string& rpc_authentication) {
+  rpc_authentication_ = rpc_authentication;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_encryption(const std::string& rpc_encryption) {
+  rpc_encryption_ = rpc_encryption;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers) {
+  rpc_tls_ciphers_ = rpc_tls_ciphers;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_min_protocol(
+    const std::string& rpc_tls_min_protocol) {
+  rpc_tls_min_protocol_ = rpc_tls_min_protocol;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_cert_key_files(
+    const std::string& cert, const std::string& private_key) {
+  rpc_certificate_file_ = cert;
+  rpc_private_key_file_ = private_key;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_certificate_authority_file(const std::string& ca) {
+  rpc_ca_certificate_file_ = ca;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_private_password_key_cmd(const std::string& cmd) {
+  rpc_private_key_password_cmd_ = cmd;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_keytab_file(const std::string& keytab_file) {
+  keytab_file_ = keytab_file;
   return *this;
 }
 
@@ -254,7 +173,8 @@ MessengerBuilder& MessengerBuilder::enable_inbound_tls() {
 }
 
 Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
-  RETURN_NOT_OK(SaslInit()); // Initialize SASL library before we start making requests
+  // Initialize SASL library before we start making requests
+  RETURN_NOT_OK(SaslInit(!keytab_file_.empty()));
 
   Messenger* new_msgr(new Messenger(*this));
 
@@ -263,33 +183,34 @@ Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
   });
 
   RETURN_NOT_OK(ParseTriState("--rpc_authentication",
-                              FLAGS_rpc_authentication,
+                              rpc_authentication_,
                               &new_msgr->authentication_));
 
   RETURN_NOT_OK(ParseTriState("--rpc_encryption",
-                              FLAGS_rpc_encryption,
+                              rpc_encryption_,
                               &new_msgr->encryption_));
 
   RETURN_NOT_OK(new_msgr->Init());
   if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
     auto* tls_context = new_msgr->mutable_tls_context();
 
-    if (!FLAGS_rpc_certificate_file.empty()) {
-      CHECK(!FLAGS_rpc_private_key_file.empty());
-      CHECK(!FLAGS_rpc_ca_certificate_file.empty());
+    if (!rpc_certificate_file_.empty()) {
+      CHECK(!rpc_private_key_file_.empty());
+      CHECK(!rpc_ca_certificate_file_.empty());
+
       // TODO(KUDU-1920): should we try and enforce that the server
       // is in the subject or alt names of the cert?
-      RETURN_NOT_OK(tls_context->LoadCertificateAuthority(FLAGS_rpc_ca_certificate_file));
-      if (FLAGS_rpc_private_key_password_cmd.empty()) {
-        RETURN_NOT_OK(tls_context->LoadCertificateAndKey(FLAGS_rpc_certificate_file,
-                                                         FLAGS_rpc_private_key_file));
+      RETURN_NOT_OK(tls_context->LoadCertificateAuthority(rpc_ca_certificate_file_));
+      if (rpc_private_key_password_cmd_.empty()) {
+        RETURN_NOT_OK(tls_context->LoadCertificateAndKey(rpc_certificate_file_,
+                                                         rpc_private_key_file_));
       } else {
         RETURN_NOT_OK(tls_context->LoadCertificateAndPasswordProtectedKey(
-            FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file,
+            rpc_certificate_file_, rpc_private_key_file_,
             [&](){
               string ret;
               WARN_NOT_OK(security::GetPasswordFromShellCommand(
-                  FLAGS_rpc_private_key_password_cmd, &ret),
+                  rpc_private_key_password_cmd_, &ret),
                   "could not get RPC password from configured command");
               return ret;
             }
@@ -353,7 +274,7 @@ Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr,
   // Before listening, if we expect to require Kerberos, we want to verify
   // that everything is set up correctly. This way we'll generate errors on
   // startup rather than later on when we first receive a client connection.
-  if (!FLAGS_keytab_file.empty()) {
+  if (!keytab_file_.empty()) {
     RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(sasl_proto_name()),
                           "GSSAPI/Kerberos not properly configured");
   }
@@ -439,11 +360,13 @@ Messenger::Messenger(const MessengerBuilder &bld)
     closing_(false),
     authentication_(RpcAuthentication::REQUIRED),
     encryption_(RpcEncryption::REQUIRED),
-    tls_context_(new security::TlsContext()),
+    tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, bld.rpc_tls_min_protocol_)),
     token_verifier_(new security::TokenVerifier()),
     rpcz_store_(new RpczStore()),
     metric_entity_(bld.metric_entity_),
+    rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
     sasl_proto_name_(bld.sasl_proto_name_),
+    keytab_file_(bld.keytab_file_),
     retain_self_(this) {
   for (int i = 0; i < bld.num_reactors_; i++) {
     reactors_.push_back(new Reactor(retain_self_, i, bld));

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index 8756842..b0099d3 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -32,6 +32,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/response_callback.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/metrics.h"
@@ -51,6 +52,9 @@ class TokenVerifier;
 
 namespace rpc {
 
+using security::RpcAuthentication;
+using security::RpcEncryption;
+
 class AcceptorPool;
 class DumpRunningRpcsRequestPB;
 class DumpRunningRpcsResponsePB;
@@ -75,20 +79,6 @@ struct AcceptorPoolInfo {
   Sockaddr bind_address_;
 };
 
-// Authentication configuration for RPC connections.
-enum class RpcAuthentication {
-  DISABLED,
-  OPTIONAL,
-  REQUIRED,
-};
-
-// Encryption configuration for RPC connections.
-enum class RpcEncryption {
-  DISABLED,
-  OPTIONAL,
-  REQUIRED,
-};
-
 // Used to construct a Messenger.
 class MessengerBuilder {
  public:
@@ -118,8 +108,54 @@ class MessengerBuilder {
   // Set metric entity for use by RPC systems.
   MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity);
 
+  // Set the time in milliseconds after which an idle connection from a client will be
+  // disconnected by the server.
+  MessengerBuilder &set_connection_keep_alive_time(int32_t time_in_ms);
+
+  // Set the timeout for negotiating an RPC connection.
+  MessengerBuilder &set_rpc_negotiation_timeout_ms(int64_t time_in_ms);
+
   // Set the SASL protocol name that is used for the SASL negotiation.
-  MessengerBuilder &set_sasl_proto_name(std::string sasl_proto_name);
+  MessengerBuilder &set_sasl_proto_name(const std::string& sasl_proto_name);
+
+  // Set the state of authentication required. If 'optional', authentication will be used when
+  // the remote end supports it. If 'required', connections which are not able to authenticate
+  // (because the remote end lacks support) are rejected.
+  MessengerBuilder &set_rpc_authentication(const std::string& rpc_authentication);
+
+  // Set the state of encryption required. If 'optional', encryption will be used when the
+  // remote end supports it. If 'required', connections which are not able to use encryption
+  // (because the remote end lacks support) are rejected. If 'disabled', encryption will not
+  // be used, and RPC authentication (--rpc_authentication) must also be disabled as well.
+  MessengerBuilder &set_rpc_encryption(const std::string& rpc_encryption);
+
+  // Set the cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+  // cipher preference list format. See man (1) ciphers for more information.
+  MessengerBuilder &set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers);
+
+  // Set the minimum protocol version to allow when for securing RPC connections with TLS. May be
+  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  MessengerBuilder &set_rpc_tls_min_protocol(const std::string& rpc_tls_min_protocol);
+
+  // Set the TLS server certificate and private key files paths. If this is set in conjunction
+  // with enable_inbound_tls(), internal PKI will not be used for encrypted communication and
+  // external PKI will be used instead.
+  MessengerBuilder &set_epki_cert_key_files(
+      const std::string& cert, const std::string& private_key);
+
+  // Set the TLS Certificate Authority file path. Must always be set with set_epki_cert_key_files().
+  // If this is set in conjunction with enable_inbound_tls(), internal PKI will not be used for
+  // encrypted communication and external PKI will be used instead.
+  MessengerBuilder &set_epki_certificate_authority_file(const std::string& ca);
+
+  // Set a Unix command whose output returns the password used to decrypt the RPC server's private
+  // key file specified via set_epki_cert_key_files(). If the .PEM key file is not
+  // password-protected, this flag does not need to be set. Trailing whitespace will be trimmed
+  // before it is used to decrypt the private key.
+  MessengerBuilder &set_epki_private_password_key_cmd(const std::string& cmd);
+
+  // Set the path to the Kerberos Keytab file for this server.
+  MessengerBuilder &set_keytab_file(const std::string& keytab_file);
 
   // Configure the messenger to enable TLS encryption on inbound connections.
   MessengerBuilder& enable_inbound_tls();
@@ -134,7 +170,17 @@ class MessengerBuilder {
   int max_negotiation_threads_;
   MonoDelta coarse_timer_granularity_;
   scoped_refptr<MetricEntity> metric_entity_;
+  int64_t rpc_negotiation_timeout_ms_;
   std::string sasl_proto_name_;
+  std::string rpc_authentication_;
+  std::string rpc_encryption_;
+  std::string rpc_tls_ciphers_;
+  std::string rpc_tls_min_protocol_;
+  std::string rpc_certificate_file_;
+  std::string rpc_private_key_file_;
+  std::string rpc_ca_certificate_file_;
+  std::string rpc_private_key_password_cmd_;
+  std::string keytab_file_;
   bool enable_inbound_tls_;
 };
 
@@ -252,10 +298,14 @@ class Messenger {
 
   scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_.get(); }
 
+  const int64_t rpc_negotiation_timeout_ms() const { return rpc_negotiation_timeout_ms_; }
+
   const std::string& sasl_proto_name() const {
     return sasl_proto_name_;
   }
 
+  const std::string& keytab_file() const { return keytab_file_; }
+
   const scoped_refptr<RpcService> rpc_service(const std::string& service_name) const;
 
  private:
@@ -318,9 +368,15 @@ class Messenger {
 
   scoped_refptr<MetricEntity> metric_entity_;
 
+  // Timeout in milliseconds after which an incomplete connection negotiation will timeout.
+  const int64_t rpc_negotiation_timeout_ms_;
+
   // The SASL protocol name that is used for the SASL negotiation.
   const std::string sasl_proto_name_;
 
+  // Path to the Kerberos Keytab file for this server.
+  const std::string keytab_file_;
+
   // The ownership of the Messenger object is somewhat subtle. The pointer graph
   // looks like this:
   //

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.cc b/be/src/kudu/rpc/negotiation.cc
index f0280c6..cd585b2 100644
--- a/be/src/kudu/rpc/negotiation.cc
+++ b/be/src/kudu/rpc/negotiation.cc
@@ -55,9 +55,6 @@ DEFINE_int32_hidden(rpc_negotiation_inject_delay_ms, 0,
              "the RPC negotiation process on the server side.");
 TAG_FLAG(rpc_negotiation_inject_delay_ms, unsafe);
 
-DECLARE_string(keytab_file);
-DECLARE_string(rpc_certificate_file);
-
 DEFINE_bool_hidden(rpc_encrypt_loopback_connections, false,
             "Whether to encrypt data transfer on RPC connections that stay within "
             "a single host. Encryption here is likely to offer no additional "
@@ -227,9 +224,10 @@ static Status DoServerNegotiation(Connection* conn,
                                   RpcAuthentication authentication,
                                   RpcEncryption encryption,
                                   const MonoTime& deadline) {
+  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   if (authentication == RpcAuthentication::REQUIRED &&
-      FLAGS_keytab_file.empty() &&
-      FLAGS_rpc_certificate_file.empty()) {
+      messenger->keytab_file().empty() &&
+      !messenger->tls_context().is_external_cert()) {
     return Status::InvalidArgument("RPC authentication (--rpc_authentication) may not be "
                                    "required unless Kerberos (--keytab_file) or external PKI "
                                    "(--rpc_certificate_file et al) are configured");
@@ -242,14 +240,13 @@ static Status DoServerNegotiation(Connection* conn,
   }
 
   // Create a new ServerNegotiation to handle the synchronous negotiation.
-  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
   ServerNegotiation server_negotiation(conn->release_socket(),
                                        &messenger->tls_context(),
                                        &messenger->token_verifier(),
                                        encryption,
                                        messenger->sasl_proto_name());
 
-  if (authentication != RpcAuthentication::DISABLED && !FLAGS_keytab_file.empty()) {
+  if (authentication != RpcAuthentication::DISABLED && !messenger->keytab_file().empty()) {
     RETURN_NOT_OK(server_negotiation.EnableGSSAPI());
   }
   if (authentication != RpcAuthentication::REQUIRED) {

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.h b/be/src/kudu/rpc/negotiation.h
index 2ca459b..93f5a04 100644
--- a/be/src/kudu/rpc/negotiation.h
+++ b/be/src/kudu/rpc/negotiation.h
@@ -21,13 +21,12 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/monotime.h"
+#include "kudu/security/security_flags.h"
 
 namespace kudu {
 namespace rpc {
 
 class Connection;
-enum class RpcAuthentication;
-enum class RpcEncryption;
 
 enum class AuthenticationType {
   INVALID,
@@ -44,8 +43,8 @@ class Negotiation {
 
   // Perform negotiation for a connection (either server or client)
   static void RunNegotiation(const scoped_refptr<Connection>& conn,
-                             RpcAuthentication authentication,
-                             RpcEncryption encryption,
+                             security::RpcAuthentication authentication,
+                             security::RpcEncryption encryption,
                              MonoTime deadline);
  private:
   DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation);

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index e6a216c..573d244 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -70,11 +70,6 @@ using std::shared_ptr;
 using std::unique_ptr;
 using strings::Substitute;
 
-DEFINE_int64_hidden(rpc_negotiation_timeout_ms, 3000,
-             "Timeout for negotiating an RPC connection.");
-TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
-TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
-
 DEFINE_bool_hidden(rpc_reopen_outbound_connections, false,
             "Open a new connection to the server for every RPC call. "
             "If not enabled, an already existing connection to a "
@@ -474,7 +469,7 @@ Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>
 
   // Set a limit on how long the server will negotiate with a new client.
   MonoTime deadline = MonoTime::Now() +
-      MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms);
+      MonoDelta::FromMilliseconds(reactor()->messenger()->rpc_negotiation_timeout_ms());
 
   scoped_refptr<Trace> trace(new Trace());
   ADOPT_TRACE(trace.get());

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index ca7ccc4..204f98d 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -429,10 +429,17 @@ class RpcTestBase : public KuduTest {
  protected:
   std::shared_ptr<Messenger> CreateMessenger(const string &name,
                                              int n_reactors = 1,
-                                             bool enable_ssl = false) {
+                                             bool enable_ssl = false,
+                                             const std::string& rpc_certificate_file = "",
+                                             const std::string& rpc_private_key_file = "",
+                                             const std::string& rpc_ca_certificate_file = "",
+                                             const std::string& rpc_private_key_password_cmd = "") {
     MessengerBuilder bld(name);
 
     if (enable_ssl) {
+      bld.set_epki_cert_key_files(rpc_certificate_file, rpc_private_key_file);
+      bld.set_epki_certificate_authority_file(rpc_ca_certificate_file);
+      bld.set_epki_private_password_key_cmd(rpc_private_key_password_cmd);
       bld.enable_inbound_tls();
     }
 
@@ -554,8 +561,14 @@ class RpcTestBase : public KuduTest {
     LOG(INFO) << "status: " << s.ToString() << ", seconds elapsed: " << sw.elapsed().wall_seconds();
   }
 
-  void StartTestServer(Sockaddr *server_addr, bool enable_ssl = false) {
-    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl);
+  void StartTestServer(Sockaddr *server_addr,
+                       bool enable_ssl = false,
+                       const std::string& rpc_certificate_file = "",
+                       const std::string& rpc_private_key_file = "",
+                       const std::string& rpc_ca_certificate_file = "",
+                       const std::string& rpc_private_key_password_cmd = "") {
+    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, rpc_certificate_file,
+        rpc_private_key_file, rpc_ca_certificate_file, rpc_private_key_password_cmd);
   }
 
   void StartTestServerWithGeneratedCode(Sockaddr *server_addr, bool enable_ssl = false) {
@@ -564,7 +577,7 @@ class RpcTestBase : public KuduTest {
 
   void StartTestServerWithCustomMessenger(Sockaddr *server_addr,
       const std::shared_ptr<Messenger>& messenger, bool enable_ssl = false) {
-    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, messenger);
+    DoStartTestServer<GenericCalculatorService>(server_addr, enable_ssl, "", "", "", "", messenger);
   }
 
   // Start a simple socket listening on a local port, returning the address.
@@ -590,11 +603,17 @@ class RpcTestBase : public KuduTest {
   }
 
   template<class ServiceClass>
-  void DoStartTestServer(Sockaddr *server_addr, bool enable_ssl = false,
-      const std::shared_ptr<Messenger>& messenger = nullptr) {
+  void DoStartTestServer(Sockaddr *server_addr,
+                         bool enable_ssl = false,
+                         const std::string& rpc_certificate_file = "",
+                         const std::string& rpc_private_key_file = "",
+                         const std::string& rpc_ca_certificate_file = "",
+                         const std::string& rpc_private_key_password_cmd = "",
+                         const std::shared_ptr<Messenger>& messenger = nullptr) {
     if (!messenger) {
       server_messenger_ =
-          CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl);
+          CreateMessenger("TestServer", n_server_reactor_threads_, enable_ssl, rpc_certificate_file,
+              rpc_private_key_file, rpc_ca_certificate_file, rpc_private_key_password_cmd);
     } else {
       server_messenger_ = messenger;
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index c97423b..dc29323 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -43,10 +43,6 @@ METRIC_DECLARE_histogram(rpc_incoming_queue_time);
 
 DECLARE_bool(rpc_reopen_outbound_connections);
 DECLARE_int32(rpc_negotiation_inject_delay_ms);
-DECLARE_string(rpc_certificate_file);
-DECLARE_string(rpc_ca_certificate_file);
-DECLARE_string(rpc_private_key_file);
-DECLARE_string(rpc_private_key_password_cmd);
 
 using std::shared_ptr;
 using std::string;
@@ -161,17 +157,22 @@ TEST_P(TestRpc, TestCallWithChainCerts) {
   // We're only interested in running this test with TLS enabled.
   if (!enable_ssl) return;
 
+  string rpc_certificate_file;
+  string rpc_private_key_file;
+  string rpc_ca_certificate_file;
   ASSERT_OK(security::CreateTestSSLCertSignedByChain(GetTestDataDirectory(),
-                                                     &FLAGS_rpc_certificate_file,
-                                                     &FLAGS_rpc_private_key_file,
-                                                     &FLAGS_rpc_ca_certificate_file));
+                                                     &rpc_certificate_file,
+                                                     &rpc_private_key_file,
+                                                     &rpc_ca_certificate_file));
   // Set up server.
   Sockaddr server_addr;
   StartTestServer(&server_addr, enable_ssl);
 
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
-  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl,
+      rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file));
+
   Proxy p(client_messenger, server_addr, server_addr.host(),
           GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
@@ -188,20 +189,26 @@ TEST_P(TestRpc, TestCallWithPasswordProtectedKey) {
   // We're only interested in running this test with TLS enabled.
   if (!enable_ssl) return;
 
+  string rpc_certificate_file;
+  string rpc_private_key_file;
+  string rpc_ca_certificate_file;
+  string rpc_private_key_password_cmd;
   string passwd;
   ASSERT_OK(security::CreateTestSSLCertWithEncryptedKey(GetTestDataDirectory(),
-                                                       &FLAGS_rpc_certificate_file,
-                                                       &FLAGS_rpc_private_key_file,
+                                                       &rpc_certificate_file,
+                                                       &rpc_private_key_file,
                                                        &passwd));
-  FLAGS_rpc_ca_certificate_file = FLAGS_rpc_certificate_file;
-  FLAGS_rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
+  rpc_ca_certificate_file = rpc_certificate_file;
+  rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
   // Set up server.
   Sockaddr server_addr;
   StartTestServer(&server_addr, enable_ssl);
 
   // Set up client.
   SCOPED_TRACE(strings::Substitute("Connecting to $0", server_addr.ToString()));
-  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl,
+      rpc_certificate_file, rpc_private_key_file, rpc_ca_certificate_file,
+      rpc_private_key_password_cmd));
   Proxy p(client_messenger, server_addr, server_addr.host(),
           GenericCalculatorService::static_service_name());
   ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
@@ -219,18 +226,23 @@ TEST_P(TestRpc, TestCallWithBadPasswordProtectedKey) {
   if (!enable_ssl) return;
   ::testing::FLAGS_gtest_death_test_style = "threadsafe";
 
+  string rpc_certificate_file;
+  string rpc_private_key_file;
+  string rpc_ca_certificate_file;
+  string rpc_private_key_password_cmd;
   string passwd;
   CHECK_OK(security::CreateTestSSLCertWithEncryptedKey(GetTestDataDirectory(),
-                                                       &FLAGS_rpc_certificate_file,
-                                                       &FLAGS_rpc_private_key_file,
+                                                       &rpc_certificate_file,
+                                                       &rpc_private_key_file,
                                                        &passwd));
   // Overwrite the password with an invalid one.
   passwd = "badpassword";
-  FLAGS_rpc_ca_certificate_file = FLAGS_rpc_certificate_file;
-  FLAGS_rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
+  rpc_ca_certificate_file = rpc_certificate_file;
+  rpc_private_key_password_cmd = strings::Substitute("echo $0", passwd);
   // Verify that the server fails to start up.
   Sockaddr server_addr;
-  ASSERT_DEATH(StartTestServer(&server_addr, enable_ssl), "failed to load private key file");
+  ASSERT_DEATH(StartTestServer(&server_addr, enable_ssl, rpc_certificate_file, rpc_private_key_file,
+      rpc_ca_certificate_file, rpc_private_key_password_cmd), "failed to load private key file");
 }
 
 // Test that connecting to an invalid server properly throws an error.

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/sasl_common.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/sasl_common.cc b/be/src/kudu/rpc/sasl_common.cc
index 9f14413..e169e81 100644
--- a/be/src/kudu/rpc/sasl_common.cc
+++ b/be/src/kudu/rpc/sasl_common.cc
@@ -25,14 +25,12 @@
 #include <string>
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <regex.h>
 #include <sasl/sasl.h>
 #include <sasl/saslplug.h>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/once.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/util/flag_tags.h"
@@ -43,8 +41,6 @@
 
 using std::set;
 
-DECLARE_string(keytab_file);
-
 namespace kudu {
 namespace rpc {
 
@@ -62,6 +58,9 @@ static bool sasl_is_initialized = false;
 // If true, then we expect someone else has initialized SASL.
 static bool g_disable_sasl_init = false;
 
+// If true, we expect kerberos to be enabled.
+static bool has_kerberos_keytab = false;
+
 // Output Sasl messages.
 // context: not used.
 // level: logging level.
@@ -211,9 +210,11 @@ static bool SaslMutexImplementationProvided() {
 
 // Actually perform the initialization for the SASL subsystem.
 // Meant to be called via GoogleOnceInit().
-static void DoSaslInit() {
+static void DoSaslInit(bool kerberos_keytab_provided) {
   VLOG(3) << "Initializing SASL library";
 
+  has_kerberos_keytab = kerberos_keytab_provided;
+
   bool sasl_initialized = SaslIsInitialized();
   if (sasl_initialized && !g_disable_sasl_init) {
     LOG(WARNING) << "SASL was initialized prior to Kudu's initialization. Skipping "
@@ -264,10 +265,12 @@ Status DisableSaslInitialization() {
   return Status::OK();
 }
 
-Status SaslInit() {
+Status SaslInit(bool kerberos_keytab_provided) {
   // Only execute SASL initialization once
-  static GoogleOnceType once = GOOGLE_ONCE_INIT;
-  GoogleOnceInit(&once, &DoSaslInit);
+  static std::once_flag once;
+  std::call_once(once, DoSaslInit, kerberos_keytab_provided);
+  DCHECK_EQ(kerberos_keytab_provided, has_kerberos_keytab);
+
   return sasl_init_status;
 }
 
@@ -319,10 +322,9 @@ Status WrapSaslCall(sasl_conn_t* conn, const std::function<int()>& call) {
   g_auth_failure_capture = &err;
 
   // Take the 'kerberos_reinit_lock' here to avoid a possible race with ticket renewal.
-  bool kerberos_supported = !FLAGS_keytab_file.empty();
-  if (kerberos_supported) kudu::security::KerberosReinitLock()->ReadLock();
+  if (has_kerberos_keytab) kudu::security::KerberosReinitLock()->ReadLock();
   int rc = call();
-  if (kerberos_supported) kudu::security::KerberosReinitLock()->ReadUnlock();
+  if (has_kerberos_keytab) kudu::security::KerberosReinitLock()->ReadUnlock();
   g_auth_failure_capture = nullptr;
 
   switch (rc) {

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/sasl_common.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/sasl_common.h b/be/src/kudu/rpc/sasl_common.h
index 6022f9e..9f16006 100644
--- a/be/src/kudu/rpc/sasl_common.h
+++ b/be/src/kudu/rpc/sasl_common.h
@@ -64,7 +64,7 @@ struct SaslMechanism {
 //
 // This function is thread safe and uses a static lock.
 // This function should NOT be called during static initialization.
-Status SaslInit() WARN_UNUSED_RESULT;
+Status SaslInit(bool kerberos_keytab_provided = false) WARN_UNUSED_RESULT;
 
 // Disable Kudu's initialization of SASL. See equivalent method in client.h.
 Status DisableSaslInitialization() WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/server_negotiation.cc b/be/src/kudu/rpc/server_negotiation.cc
index 9878dd9..fc4719c 100644
--- a/be/src/kudu/rpc/server_negotiation.cc
+++ b/be/src/kudu/rpc/server_negotiation.cc
@@ -369,8 +369,6 @@ Status ServerNegotiation::ValidateConnectionHeader(faststring* recv_buf) {
 
 // calls sasl_server_init() and sasl_server_new()
 Status ServerNegotiation::InitSaslServer() {
-  RETURN_NOT_OK(SaslInit());
-
   // TODO(unknown): Support security flags.
   unsigned secflags = 0;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/server_negotiation.h b/be/src/kudu/rpc/server_negotiation.h
index 4cafeab..716d363 100644
--- a/be/src/kudu/rpc/server_negotiation.h
+++ b/be/src/kudu/rpc/server_negotiation.h
@@ -29,6 +29,7 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/socket.h"
@@ -58,7 +59,7 @@ class ServerNegotiation {
   ServerNegotiation(std::unique_ptr<Socket> socket,
                     const security::TlsContext* tls_context,
                     const security::TokenVerifier* token_verifier,
-                    RpcEncryption encryption,
+                    security::RpcEncryption encryption,
                     std::string sasl_proto_name);
 
   // Enable PLAIN authentication.
@@ -220,7 +221,7 @@ class ServerNegotiation {
   // TLS state.
   const security::TlsContext* tls_context_;
   security::TlsHandshake tls_handshake_;
-  const RpcEncryption encryption_;
+  const security::RpcEncryption encryption_;
   bool tls_negotiated_;
 
   // TSK state.

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
index 4c88fe3..8736e66 100644
--- a/be/src/kudu/security/CMakeLists.txt
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -62,6 +62,7 @@ set(SECURITY_SRCS
   init.cc
   openssl_util.cc
   ${PORTED_X509_CHECK_HOST_CC}
+  security_flags.cc
   simple_acl.cc
   tls_context.cc
   tls_handshake.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/init.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.cc b/be/src/kudu/security/init.cc
index 8c448c2..7674c7e 100644
--- a/be/src/kudu/security/init.cc
+++ b/be/src/kudu/security/init.cc
@@ -40,9 +40,6 @@
 
 #include "common/config.h"
 
-DECLARE_string(keytab_file);
-TAG_FLAG(keytab_file, stable);
-
 #ifndef __APPLE__
 static constexpr bool kDefaultSystemAuthToLocal = true;
 #else
@@ -458,12 +455,12 @@ boost::optional<string> GetLoggedInUsernameFromKeytab() {
   return g_kinit_ctx->username_str();
 }
 
-Status InitKerberosForServer(const std::string& raw_principal, const std::string& krb5ccname,
-    bool disable_krb5_replay_cache) {
-  if (FLAGS_keytab_file.empty()) return Status::OK();
+Status InitKerberosForServer(const std::string& raw_principal, const std::string& keytab_file,
+    const std::string& krb5ccname, bool disable_krb5_replay_cache) {
+  if (keytab_file.empty()) return Status::OK();
 
   setenv("KRB5CCNAME", krb5ccname.c_str(), 1);
-  setenv("KRB5_KTNAME", FLAGS_keytab_file.c_str(), 1);
+  setenv("KRB5_KTNAME", keytab_file.c_str(), 1);
 
   if (disable_krb5_replay_cache) {
     // KUDU-1897: disable the Kerberos replay cache. The KRPC protocol includes a
@@ -477,7 +474,7 @@ Status InitKerberosForServer(const std::string& raw_principal, const std::string
   string configured_principal;
   RETURN_NOT_OK(GetConfiguredPrincipal(raw_principal, &configured_principal));
   RETURN_NOT_OK_PREPEND(g_kinit_ctx->Kinit(
-      FLAGS_keytab_file, configured_principal), "unable to kinit");
+      keytab_file, configured_principal), "unable to kinit");
 
   g_kerberos_reinit_lock = new RWMutex(RWMutex::Priority::PREFER_WRITING);
   scoped_refptr<Thread> reacquire_thread;

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/init.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/init.h b/be/src/kudu/security/init.h
index 656efb7..34f29b9 100644
--- a/be/src/kudu/security/init.h
+++ b/be/src/kudu/security/init.h
@@ -36,10 +36,13 @@ static const std::string kKrb5CCName = "MEMORY:kudu";
 // the '--keytab_file' command line flag.
 // 'raw_principal' is the principal to Kinit with after calling GetConfiguredPrincipal()
 // on it.
+// 'keytab_file' is the path to the kerberos keytab file. If it's an empty string, kerberos
+// will not be initialized.
 // 'krb5ccname' is passed into the KRB5CCNAME env var.
 // 'disable_krb5_replay_cache' if set to true, disables the kerberos replay cache by setting
 // the KRB5RCACHETYPE env var to "none".
 Status InitKerberosForServer(const std::string& raw_principal,
+                             const std::string& keytab_file,
                              const std::string& krb5ccname = kKrb5CCName,
                              bool disable_krb5_replay_cache = true);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/security_flags.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/security_flags.cc b/be/src/kudu/security/security_flags.cc
new file mode 100644
index 0000000..acdd662
--- /dev/null
+++ b/be/src/kudu/security/security_flags.cc
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/security_flags.h"
+
+namespace kudu {
+namespace security {
+
+// This is the "modern compatibility" cipher list of the Mozilla Security
+// Server Side TLS recommendations, accessed Feb. 2017, with the addition of
+// the non ECDH/DH AES cipher suites from the "intermediate compatibility"
+// list. These additional ciphers maintain compatibility with RHEL 6.5 and
+// below. The DH AES ciphers are not included since we are not configured to
+// use DH key agreement.
+const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsCiphers =
+                                   "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
+                                   "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
+                                   "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
+                                   "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
+                                   "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:"
+                                   "AES256-GCM-SHA384:AES128-GCM-SHA256:"
+                                   "AES256-SHA256:AES128-SHA256:"
+                                   "AES256-SHA:AES128-SHA";
+
+const char* const SecurityDefaults::SecurityDefaults::kDefaultTlsMinVersion = "TLSv1";
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/security_flags.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/security_flags.h b/be/src/kudu/security/security_flags.h
new file mode 100644
index 0000000..e64536d
--- /dev/null
+++ b/be/src/kudu/security/security_flags.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/util/flags.h"
+
+namespace kudu {
+namespace security {
+
+// Authentication configuration for RPC connections.
+typedef TriStateFlag RpcAuthentication;
+
+// Encryption configuration for RPC connections.
+typedef TriStateFlag RpcEncryption;
+
+struct SecurityDefaults {
+  static const char* const kDefaultTlsCiphers;
+  static const char* const kDefaultTlsMinVersion;
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/test/mini_kdc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/test/mini_kdc-test.cc b/be/src/kudu/security/test/mini_kdc-test.cc
index e93c10d..441a85d 100644
--- a/be/src/kudu/security/test/mini_kdc-test.cc
+++ b/be/src/kudu/security/test/mini_kdc-test.cc
@@ -17,8 +17,7 @@
 
 #include <string>
 
-#include <boost/optional.hpp>
-#include <gflags/gflags.h>
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest.h>
 
 #include "kudu/security/init.h"
@@ -28,9 +27,6 @@
 
 using std::string;
 
-DECLARE_string(keytab_file);
-DECLARE_string(principal);
-
 namespace kudu {
 
 class MiniKdcTest : public KuduTest {};
@@ -75,8 +71,7 @@ TEST_F(MiniKdcTest, TestBasicOperation) {
 
   // Test programmatic keytab login.
   kdc.SetKrb5Environment();
-  FLAGS_keytab_file = kt_path;
-  ASSERT_OK(security::InitKerberosForServer(kSPN));
+  ASSERT_OK(security::InitKerberosForServer(kSPN, kt_path));
   ASSERT_EQ("kudu/foo.example.com@KRBTEST.COM", *security::GetLoggedInPrincipalFromKeytab());
 
   // Test principal canonicalization.

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/tls_context.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.cc b/be/src/kudu/security/tls_context.cc
index 299e416..b6ec57f 100644
--- a/be/src/kudu/security/tls_context.cc
+++ b/be/src/kudu/security/tls_context.cc
@@ -34,6 +34,7 @@
 #include "kudu/security/crypto.h"
 #include "kudu/security/init.h"
 #include "kudu/security/openssl_util.h"
+#include "kudu/security/security_flags.h"
 #include "kudu/security/tls_handshake.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/net/net_util.h"
@@ -50,32 +51,6 @@ DEFINE_int32_hidden(ipki_server_key_size, 2048,
              "is used for TLS connections to and from clients and other servers.");
 TAG_FLAG(ipki_server_key_size, experimental);
 
-DEFINE_string_hidden(rpc_tls_ciphers,
-              // This is the "modern compatibility" cipher list of the Mozilla Security
-              // Server Side TLS recommendations, accessed Feb. 2017, with the addition of
-              // the non ECDH/DH AES cipher suites from the "intermediate compatibility"
-              // list. These additional ciphers maintain compatibility with RHEL 6.5 and
-              // below. The DH AES ciphers are not included since we are not configured to
-              // use DH key agreement.
-              "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:"
-              "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:"
-              "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:"
-              "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:"
-              "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256"
-              "AES256-GCM-SHA384:AES128-GCM-SHA256:"
-              "AES256-SHA256:AES128-SHA256:"
-              "AES256-SHA:AES128-SHA",
-              "The cipher suite preferences to use for TLS-secured RPC connections. "
-              "Uses the OpenSSL cipher preference list format. See man (1) ciphers "
-              "for more information.");
-TAG_FLAG(rpc_tls_ciphers, advanced);
-
-DEFINE_string_hidden(rpc_tls_min_protocol, "TLSv1",
-              "The minimum protocol version to allow when for securing RPC "
-              "connections with TLS. May be one of 'TLSv1', 'TLSv1.1', or "
-              "'TLSv1.2'.");
-TAG_FLAG(rpc_tls_min_protocol, advanced);
-
 namespace kudu {
 namespace security {
 
@@ -89,7 +64,19 @@ template<> struct SslTypeTraits<X509_STORE_CTX> {
 };
 
 TlsContext::TlsContext()
-    : lock_(RWMutex::Priority::PREFER_READING),
+    : tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
+      lock_(RWMutex::Priority::PREFER_READING),
+      trusted_cert_count_(0),
+      has_cert_(false),
+      is_external_cert_(false) {
+  security::InitializeOpenSSL();
+}
+
+TlsContext::TlsContext(std::string tls_ciphers, std::string tls_min_protocol)
+    : tls_ciphers_(std::move(tls_ciphers)),
+      tls_min_protocol_(std::move(tls_min_protocol)),
+      lock_(RWMutex::Priority::PREFER_READING),
       trusted_cert_count_(0),
       has_cert_(false),
       is_external_cert_(false) {
@@ -120,7 +107,7 @@ Status TlsContext::Init() {
   //   https://tools.ietf.org/html/rfc7525#section-3.3
   auto options = SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION;
 
-  if (boost::iequals(FLAGS_rpc_tls_min_protocol, "TLSv1.2")) {
+  if (boost::iequals(tls_min_protocol_, "TLSv1.2")) {
 #if OPENSSL_VERSION_NUMBER < 0x10001000L
     return Status::InvalidArgument(
         "--rpc_tls_min_protocol=TLSv1.2 is not be supported on this platform. "
@@ -128,7 +115,7 @@ Status TlsContext::Init() {
 #else
     options |= SSL_OP_NO_TLSv1 | SSL_OP_NO_TLSv1_1;
 #endif
-  } else if (boost::iequals(FLAGS_rpc_tls_min_protocol, "TLSv1.1")) {
+  } else if (boost::iequals(tls_min_protocol_, "TLSv1.1")) {
 #if OPENSSL_VERSION_NUMBER < 0x10001000L
     return Status::InvalidArgument(
         "--rpc_tls_min_protocol=TLSv1.1 is not be supported on this platform. "
@@ -136,15 +123,15 @@ Status TlsContext::Init() {
 #else
     options |= SSL_OP_NO_TLSv1;
 #endif
-  } else if (!boost::iequals(FLAGS_rpc_tls_min_protocol, "TLSv1")) {
+  } else if (!boost::iequals(tls_min_protocol_, "TLSv1")) {
     return Status::InvalidArgument("unknown value provided for --rpc_tls_min_protocol flag",
-                                   FLAGS_rpc_tls_min_protocol);
+                                   tls_min_protocol_);
   }
 
   SSL_CTX_set_options(ctx_.get(), options);
 
   OPENSSL_RET_NOT_OK(
-      SSL_CTX_set_cipher_list(ctx_.get(), FLAGS_rpc_tls_ciphers.c_str()),
+      SSL_CTX_set_cipher_list(ctx_.get(), tls_ciphers_.c_str()),
       "failed to set TLS ciphers");
 
   // Enable ECDH curves. For OpenSSL 1.1.0 and up, this is done automatically.

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/security/tls_context.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_context.h b/be/src/kudu/security/tls_context.h
index 1785471..fb0685a 100644
--- a/be/src/kudu/security/tls_context.h
+++ b/be/src/kudu/security/tls_context.h
@@ -69,6 +69,8 @@ class TlsContext {
 
   TlsContext();
 
+  TlsContext(std::string tls_ciphers, std::string tls_min_protocol);
+
   ~TlsContext() = default;
 
   Status Init() WARN_UNUSED_RESULT;
@@ -172,6 +174,14 @@ class TlsContext {
 
   Status VerifyCertChainUnlocked(const Cert& cert) WARN_UNUSED_RESULT;
 
+  // The cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+  // cipher preference list format. See man (1) ciphers for more information.
+  std::string tls_ciphers_;
+
+  // The minimum protocol version to allow when for securing RPC connections with TLS. May be
+  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  std::string tls_min_protocol_;
+
   // Protects all members.
   //
   // Taken in write mode when any changes are modifying the underlying SSL_CTX

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/util/flags.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flags.cc b/be/src/kudu/util/flags.cc
index 0047b96..090835e 100644
--- a/be/src/kudu/util/flags.cc
+++ b/be/src/kudu/util/flags.cc
@@ -27,6 +27,7 @@
 #include <sys/stat.h>
 #include <sys/types.h>
 
+#include <boost/algorithm/string/predicate.hpp>
 #include <gflags/gflags.h>
 #include <gperftools/heap-profiler.h>
 
@@ -549,4 +550,20 @@ GFlagsMap GetFlagsMap() {
   return flags_by_name;
 }
 
+Status ParseTriState(const char* flag_name, const std::string& flag_value,
+    TriStateFlag* tri_state) {
+  if (boost::iequals(flag_value, "required")) {
+    *tri_state = TriStateFlag::REQUIRED;
+  } else if (boost::iequals(flag_value, "optional")) {
+    *tri_state = TriStateFlag::OPTIONAL;
+  } else if (boost::iequals(flag_value, "disabled")) {
+    *tri_state = TriStateFlag::DISABLED;
+  } else {
+    return Status::InvalidArgument(strings::Substitute(
+          "$0 flag must be one of 'required', 'optional', or 'disabled'",
+          flag_name));
+  }
+  return Status::OK();
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/kudu/util/flags.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/flags.h b/be/src/kudu/util/flags.h
index ae20564..7b55290 100644
--- a/be/src/kudu/util/flags.h
+++ b/be/src/kudu/util/flags.h
@@ -21,7 +21,7 @@
 #include <string>
 #include <unordered_map>
 
-#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
 
@@ -70,5 +70,14 @@ std::string GetNonDefaultFlags(const GFlagsMap& default_flags);
 
 GFlagsMap GetFlagsMap();
 
+enum class TriStateFlag {
+  DISABLED,
+  OPTIONAL,
+  REQUIRED,
+};
+
+Status ParseTriState(const char* flag_name, const std::string& flag_value,
+    TriStateFlag* tri_state);
+
 } // namespace kudu
 #endif /* KUDU_UTIL_FLAGS_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/6f2ebadf/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index d5a1b32..b333054 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -844,7 +844,7 @@ Status SaslAuthProvider::Start() {
     if (FLAGS_use_kudu_kinit) {
       // Starts a thread that periodically does a 'kinit'. The thread lives as long as the
       // process does.
-      KUDU_RETURN_IF_ERROR(kudu::security::InitKerberosForServer(principal_,
+      KUDU_RETURN_IF_ERROR(kudu::security::InitKerberosForServer(principal_, keytab_file_,
           KRB5CCNAME_PATH, false), "Could not init kerberos");
     } else {
       Promise<Status> first_kinit;