You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/02/02 18:51:32 UTC

[08/19] impala git commit: IMPALA-5054: [SECURITY] Enable KRPC w/ TLS in Impala

IMPALA-5054: [SECURITY] Enable KRPC w/ TLS in Impala

KRPC has some flags that turn on TLS. This patch sets those to enable
TLS communication.

Tests are added to rpc-mgr-test.

TODO: Kudu kerberos testing is disabled. Will re-enable as part of IMPALA-6448.

Change-Id: I9a14a44fdea9ab668f3714eb69fdb188bce38f5a
Reviewed-on: http://gerrit.cloudera.org:8080/8439
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/68b7c8b8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/68b7c8b8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/68b7c8b8

Branch: refs/heads/2.x
Commit: 68b7c8b8aabd1a65f325f971dc861f6cb2eff5ad
Parents: 885776e
Author: Sailesh Mukil <sa...@apache.org>
Authored: Sun Oct 29 18:38:57 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalogd-main.cc            |   9 +-
 be/src/rpc/authentication-test.cc          |   5 +-
 be/src/rpc/rpc-mgr-test.cc                 | 274 +++++++++++++++++++++---
 be/src/rpc/rpc-mgr.cc                      |  31 ++-
 be/src/rpc/rpc-mgr.h                       |   6 +
 be/src/rpc/thrift-server.cc                |  14 --
 be/src/rpc/thrift-server.h                 |   4 -
 be/src/runtime/exec-env.cc                 |   3 +-
 be/src/service/impala-server.cc            |   9 +-
 be/src/statestore/statestore-subscriber.cc |   5 +-
 be/src/statestore/statestore.cc            |   5 +-
 be/src/statestore/statestored-main.cc      |   3 +-
 be/src/testutil/in-process-servers.cc      |   3 +-
 be/src/util/openssl-util.cc                |  23 ++
 be/src/util/openssl-util.h                 |   9 +
 15 files changed, 332 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/catalog/catalogd-main.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index f98a406..95e5dcd 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -29,14 +29,15 @@
 #include "rpc/thrift-server.h"
 #include "runtime/mem-tracker.h"
 #include "service/fe-support.h"
+#include "util/common-metrics.h"
 #include "util/debug-util.h"
 #include "util/jni-util.h"
+#include "util/default-path-handlers.h"
+#include "util/memory-metrics.h"
 #include "util/metrics.h"
-#include "util/common-metrics.h"
 #include "util/network-util.h"
-#include "util/memory-metrics.h"
+#include "util/openssl-util.h"
 #include "util/webserver.h"
-#include "util/default-path-handlers.h"
 
 DECLARE_string(classpath);
 DECLARE_string(principal);
@@ -95,7 +96,7 @@ int CatalogdMain(int argc, char** argv) {
   ThriftServer* server;
   ThriftServerBuilder builder("CatalogService", processor, FLAGS_catalog_service_port);
 
-  if (EnableInternalSslConnections()) {
+  if (IsInternalTlsConfigured()) {
     SSLProtocol ssl_version;
     ABORT_IF_ERROR(
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/authentication-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication-test.cc b/be/src/rpc/authentication-test.cc
index 2b204ad..0dacdee 100644
--- a/be/src/rpc/authentication-test.cc
+++ b/be/src/rpc/authentication-test.cc
@@ -22,6 +22,7 @@
 #include "rpc/thrift-server.h"
 #include "util/auth-util.h"
 #include "util/network-util.h"
+#include "util/openssl-util.h"
 #include "util/thread.h"
 
 #include <ldap.h>
@@ -32,6 +33,7 @@ DECLARE_string(keytab_file);
 DECLARE_string(principal);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
 DECLARE_string(internal_principals_whitelist);
 
 // These are here so that we can grab them early in main() - the kerberos
@@ -172,7 +174,8 @@ TEST(Auth, KerbAndSslEnabled) {
   ASSERT_OK(GetHostname(&hostname));
   FLAGS_ssl_client_ca_certificate = "some_path";
   FLAGS_ssl_server_certificate = "some_path";
-  ASSERT_TRUE(EnableInternalSslConnections());
+  FLAGS_ssl_private_key = "some_path";
+  ASSERT_TRUE(IsInternalTlsConfigured());
   SaslAuthProvider sa_internal(true);
   ASSERT_OK(
       sa_internal.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts"));

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 441619b..7effda9 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -28,8 +28,10 @@
 #include "rpc/auth-provider.h"
 #include "testutil/gtest-util.h"
 #include "testutil/mini-kdc-wrapper.h"
+#include "testutil/scoped-flag-setter.h"
 #include "util/counting-barrier.h"
 #include "util/network-util.h"
+#include "util/openssl-util.h"
 #include "util/test-info.h"
 
 #include "gen-cpp/rpc_test.proxy.h"
@@ -51,6 +53,11 @@ DECLARE_int32(num_reactor_threads);
 DECLARE_int32(num_acceptor_threads);
 DECLARE_string(hostname);
 
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
 
 // The path of the current executable file that is required for passing into the SASL
 // library as the 'application name'.
@@ -68,9 +75,41 @@ int GetServerPort() {
 
 static int kdc_port = GetServerPort();
 
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
 #define PAYLOAD_SIZE (4096)
 
 template <class T> class RpcMgrTestBase : public T {
+ public:
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
  protected:
   TNetworkAddress krpc_address_;
   RpcMgr rpc_mgr_;
@@ -86,20 +125,6 @@ template <class T> class RpcMgrTestBase : public T {
     rpc_mgr_.Shutdown();
   }
 
-  // Utility function to initialize the parameter for ScanMem RPC.
-  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
-  // to 'controller'. Also sets up 'request' with the random value and index of the
-  // sidecar.
-  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
-    int32_t pattern = random();
-    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
-    int idx;
-    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
-    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
-    request->set_pattern(pattern);
-    request->set_sidecar_idx(idx);
-  }
-
  private:
   int32_t payload_[PAYLOAD_SIZE];
 };
@@ -191,35 +216,35 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
   }
 };
 
-// TODO: Disabled 'USE_KUDU_KERBEROS' and 'USE_IMPALA_KERBEROS' due to IMPALA-6268.
-// Reenable after fixing.
+// TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448.
+// Re-enable after fixing.
 INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
                         RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_OFF,
-                                          USE_KUDU_KERBEROS,
-                                          USE_IMPALA_KERBEROS));
+                        ::testing::Values(KERBEROS_OFF));
 
-TEST_P(RpcMgrKerberizedTest, MultipleServices) {
+template <class T>
+Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
+    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
   // Test that a service can be started, and will respond to requests.
   unique_ptr<ServiceIf> ping_impl(
-      new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
-  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(ping_impl)));
+      new PingServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl)));
 
   // Test that a second service, that verifies the RPC payload is not corrupted,
   // can be started.
   unique_ptr<ServiceIf> scan_mem_impl(
-      new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
-  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+      new ScanMemServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl)));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
-  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
 
   unique_ptr<PingServiceProxy> ping_proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &ping_proxy));
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
 
   RpcController controller;
   srand(0);
@@ -230,17 +255,200 @@ TEST_P(RpcMgrKerberizedTest, MultipleServices) {
     if (random() % 2 == 0) {
       PingRequestPB request;
       PingResponsePB response;
-      kudu::Status status = ping_proxy->Ping(request, &response, &controller);
-      ASSERT_TRUE(status.ok());
-      ASSERT_EQ(response.int_response(), 42);
+      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+          "unable to execute Ping() RPC.");
+      if (response.int_response() != 42) {
+          return Status(Substitute(
+              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                  response.int_response()));
+      }
     } else {
       ScanMemRequestPB request;
       ScanMemResponsePB response;
-      SetupScanMemRequest(&request, &controller);
-      kudu::Status status = scan_mem_proxy->ScanMem(request, &response, &controller);
-      ASSERT_TRUE(status.ok());
+      test_base->SetupScanMemRequest(&request, &controller);
+      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
+          "unable to execute ScanMem() RPC.");
     }
   }
+
+  return Status::OK();
+}
+
+
+TEST_F(RpcMgrTest, MultipleServices) {
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+}
+
+TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+  // TODO: We're starting a seperate RpcMgr here instead of configuring
+  // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
+  // new gtest params to turn on TLS which needs to be a coordinated change across
+  // rpc-mgr-test and thrift-server-test.
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  // Enable TLS.
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  ASSERT_OK(tls_rpc_mgr.Init());
+
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
+}
+
+// Test with a misconfigured TLS certificate and verify that an error is thrown.
+TEST_F(RpcMgrTest, BadCertificateTls) {
+
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, "unknown");
+
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+  tls_rpc_mgr.Shutdown();
+}
+
+// Test with a bad password command for the password protected private key.
+TEST_F(RpcMgrTest, BadPasswordTls) {
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(
+          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  auto password_cmd =
+      ScopedFlagSetter<string>::Make(
+          &FLAGS_ssl_private_key_password_cmd, "echo badpassword");
+
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+  tls_rpc_mgr.Shutdown();
+}
+
+// Test with a correct password command for the password protected private key.
+TEST_F(RpcMgrTest, CorrectPasswordTls) {
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(
+          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  auto password_cmd =
+      ScopedFlagSetter<string>::Make(
+          &FLAGS_ssl_private_key_password_cmd, "echo password");
+
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
+}
+
+// Test with a bad TLS cipher and verify that an error is thrown.
+TEST_F(RpcMgrTest, BadCiphersTls) {
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  auto cipher =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, "not_a_cipher");
+
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+  tls_rpc_mgr.Shutdown();
+}
+
+// Test with a valid TLS cipher.
+TEST_F(RpcMgrTest, ValidCiphersTls) {
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  auto cipher =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, TLS1_0_COMPATIBLE_CIPHER);
+
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
+}
+
+// Test with multiple valid TLS ciphers.
+TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
+  const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER,
+      TLS1_0_COMPATIBLE_CIPHER_2);
+  auto cert_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+  auto pkey_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+  auto ca_flag =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  auto cipher =
+      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, cipher_list);
+
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
 }
 
 TEST_F(RpcMgrTest, SlowCallback) {

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index d4e8fe1..c70c117 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -27,6 +27,7 @@
 #include "util/auth-util.h"
 #include "util/cpu-info.h"
 #include "util/network-util.h"
+#include "util/openssl-util.h"
 
 #include "common/names.h"
 
@@ -43,6 +44,15 @@ using kudu::Sockaddr;
 DECLARE_string(hostname);
 DECLARE_string(principal);
 DECLARE_string(be_principal);
+DECLARE_string(keytab_file);
+
+// Impala's TLS flags.
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+DECLARE_string(ssl_minimum_version);
 
 // Defined in kudu/rpc/rpcz_store.cc
 DECLARE_int32(rpc_duration_too_long_ms);
@@ -82,9 +92,23 @@ Status RpcMgr::Init() {
     RETURN_IF_ERROR(ParseKerberosPrincipal(internal_principal, &service_name,
         &unused_hostname, &unused_realm));
     bld.set_sasl_proto_name(service_name);
-    // TODO: Once the Messenger can take more options pertaining to 'rpc_authentication'
-    // and more, we need to explicitly set those options here. (KUDU-2228)
+    bld.set_rpc_authentication("required");
+    bld.set_keytab_file(FLAGS_keytab_file);
+  }
+
+  if (use_tls_) {
+    LOG (INFO) << "Initing RpcMgr with TLS";
+    bld.set_epki_cert_key_files(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
+    bld.set_epki_certificate_authority_file(FLAGS_ssl_client_ca_certificate);
+    bld.set_epki_private_password_key_cmd(FLAGS_ssl_private_key_password_cmd);
+    if (!FLAGS_ssl_cipher_list.empty()) {
+      bld.set_rpc_tls_ciphers(FLAGS_ssl_cipher_list);
+    }
+    bld.set_rpc_tls_min_protocol(FLAGS_ssl_minimum_version);
+    bld.set_rpc_encryption("required");
+    bld.enable_inbound_tls();
   }
+
   KUDU_RETURN_IF_ERROR(bld.Build(&messenger_), "Could not build messenger");
   return Status::OK();
 }
@@ -97,8 +121,7 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
       new ImpalaServicePool(std::move(service_ptr),
           messenger_->metric_entity(), service_queue_depth);
   // Start the thread pool first before registering the service in case the startup fails.
-  RETURN_IF_ERROR(
-      service_pool->Init(num_service_threads));
+  RETURN_IF_ERROR(service_pool->Init(num_service_threads));
   KUDU_RETURN_IF_ERROR(
       messenger_->RegisterService(service_pool->service_name(), service_pool),
       "Could not register service");

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index 26dbae0..b2099f2 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -97,6 +97,8 @@ namespace impala {
 /// port is configurable via FLAGS_acceptor_threads.
 class RpcMgr {
  public:
+  RpcMgr(bool use_tls = false) : use_tls_(use_tls) {}
+
   /// Initializes the reactor threads, and prepares for sending outbound RPC requests.
   Status Init() WARN_UNUSED_RESULT;
 
@@ -176,6 +178,10 @@ class RpcMgr {
 
   /// True after StartServices() completes.
   bool services_started_ = false;
+
+  /// True if TLS is configured for communication between Impala backends. messenger_ will
+  /// be configured to use TLS if this is set.
+  const bool use_tls_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 75ad424..ded710e 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -63,9 +63,6 @@ DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
 
 DECLARE_string(principal);
 DECLARE_string(keytab_file);
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_cipher_list);
 
 namespace impala {
 
@@ -103,17 +100,6 @@ bool SSLProtoVersions::IsSupported(const SSLProtocol& protocol) {
   }
 }
 
-bool EnableInternalSslConnections() {
-  // Enable SSL between servers only if both the client validation certificate and the
-  // server certificate are specified. 'Client' here means clients that are used by Impala
-  // services to contact other Impala services (as distinct from user clients of Impala
-  // like the shell), and 'servers' are the processes that serve those clients. The server
-  // needs a certificate to demonstrate it is who the client thinks it is; the client
-  // needs a certificate to validate that assertion from the server.
-  return !FLAGS_ssl_client_ca_certificate.empty() &&
-      !FLAGS_ssl_server_certificate.empty();
-}
-
 // Helper class that starts a server in a separate thread, and handles
 // the inter-thread communication to monitor whether it started
 // correctly.

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 588904f..d95d90e 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -341,10 +341,6 @@ struct SSLProtoVersions {
   static bool IsSupported(const apache::thrift::transport::SSLProtocol& protocol);
 };
 
-// Returns true if, per the process configuration flags, server<->server communications
-// should use SSL.
-bool EnableInternalSslConnections();
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0e5b0f6..0551848 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -58,6 +58,7 @@
 #include "util/memory-metrics.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
+#include "util/openssl-util.h"
 #include "util/parse-util.h"
 #include "util/pretty-printer.h"
 #include "util/thread-pool.h"
@@ -180,7 +181,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
     VLOG_QUERY << "Using KRPC.";
     // KRPC relies on resolved IP address. It's set in StartServices().
     krpc_address_.__set_port(krpc_port);
-    rpc_mgr_.reset(new RpcMgr());
+    rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
     stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
   } else {
     stream_mgr_.reset(new DataStreamMgr(metrics_.get()));

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a62130c..af79180 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -67,6 +67,7 @@
 #include "util/impalad-metrics.h"
 #include "util/lineage-util.h"
 #include "util/network-util.h"
+#include "util/openssl-util.h"
 #include "util/parse-util.h"
 #include "util/redactor.h"
 #include "util/runtime-profile-counters.h"
@@ -2025,7 +2026,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
   if (FLAGS_is_coordinator) exec_env_->frontend()->WaitForCatalog();
 
   SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
-  if (!FLAGS_ssl_server_certificate.empty() || EnableInternalSslConnections()) {
+  if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) {
     RETURN_IF_ERROR(
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
   }
@@ -2041,7 +2042,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
 
     ThriftServerBuilder be_builder("backend", be_processor, thrift_be_port);
 
-    if (EnableInternalSslConnections()) {
+    if (IsInternalTlsConfigured()) {
       LOG(INFO) << "Enabling SSL for backend";
       be_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
           .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
@@ -2067,7 +2068,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
       beeswax_processor->setEventHandler(event_handler);
       ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
 
-      if (!FLAGS_ssl_server_certificate.empty()) {
+      if (IsExternalTlsConfigured()) {
         LOG(INFO) << "Enabling SSL for Beeswax";
         builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
               .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
@@ -2094,7 +2095,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
 
       ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
 
-      if (!FLAGS_ssl_server_certificate.empty()) {
+      if (IsExternalTlsConfigured()) {
         LOG(INFO) << "Enabling SSL for HiveServer2";
         builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
               .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 99da183..e58c177 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -32,8 +32,9 @@
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-util.h"
 #include "statestore/statestore-service-client-wrapper.h"
-#include "util/time.h"
 #include "util/debug-util.h"
+#include "util/openssl-util.h"
+#include "util/time.h"
 
 #include "common/names.h"
 
@@ -197,7 +198,7 @@ Status StatestoreSubscriber::Start() {
 
     ThriftServerBuilder builder(
         "StatestoreSubscriber", processor, heartbeat_address_.port);
-    if (EnableInternalSslConnections()) {
+    if (IsInternalTlsConfigured()) {
       SSLProtocol ssl_version;
       RETURN_IF_ERROR(
           SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index b135e38..0f72e58 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -29,6 +29,7 @@
 #include "statestore/statestore-subscriber-client-wrapper.h"
 #include "util/debug-util.h"
 #include "util/logging-support.h"
+#include "util/openssl-util.h"
 #include "util/time.h"
 #include "util/uid-util.h"
 #include "util/webserver.h"
@@ -225,11 +226,11 @@ Statestore::Statestore(MetricGroup* metrics)
     update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
-        EnableInternalSslConnections())),
+        IsInternalTlsConfigured())),
     heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
         FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "",
-        EnableInternalSslConnections())),
+        IsInternalTlsConfigured())),
     thrift_iface_(new StatestoreThriftIf(this)),
     failure_detector_(new MissedHeartbeatFailureDetector(
         FLAGS_statestore_max_missed_heartbeats,

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 1a11237..633d449 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -31,6 +31,7 @@
 #include "util/common-metrics.h"
 #include "util/debug-util.h"
 #include "util/metrics.h"
+#include "util/openssl-util.h"
 #include "util/memory-metrics.h"
 #include "util/webserver.h"
 #include "util/default-path-handlers.h"
@@ -91,7 +92,7 @@ int StatestoredMain(int argc, char** argv) {
 
   ThriftServer* server;
   ThriftServerBuilder builder("StatestoreService", processor, FLAGS_state_store_port);
-  if (EnableInternalSslConnections()) {
+  if (IsInternalTlsConfigured()) {
     SSLProtocol ssl_version;
     ABORT_IF_ERROR(
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 4817d7f..7ff44a8 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -27,6 +27,7 @@
 #include "util/webserver.h"
 #include "util/default-path-handlers.h"
 #include "util/metrics.h"
+#include "util/openssl-util.h"
 #include "runtime/exec-env.h"
 #include "service/impala-server.h"
 
@@ -149,7 +150,7 @@ Status InProcessStatestore::Start() {
       new StatestoreServiceProcessor(statestore_->thrift_iface()));
 
   ThriftServerBuilder builder("StatestoreService", processor, statestore_port_);
-  if (EnableInternalSslConnections()) {
+  if (IsInternalTlsConfigured()) {
     LOG(INFO) << "Enabling SSL for Statestore";
     builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/util/openssl-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.cc b/be/src/util/openssl-util.cc
index 264a49a..69dc676 100644
--- a/be/src/util/openssl-util.cc
+++ b/be/src/util/openssl-util.cc
@@ -31,6 +31,11 @@
 
 #include "common/names.h"
 
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_cipher_list);
+
 namespace impala {
 
 // Counter to track the number of encryption keys generated. Incremented before each key
@@ -47,6 +52,24 @@ int MaxSupportedTlsVersion() {
   return SSLv23_method()->version;
 }
 
+bool IsInternalTlsConfigured() {
+  // Enable SSL between servers only if both the client validation certificate and the
+  // server certificate are specified. 'Client' here means clients that are used by Impala
+  // services to contact other Impala services (as distinct from user clients of Impala
+  // like the shell), and 'servers' are the processes that serve those clients. The server
+  // needs a certificate (FLAGS_ssl_server_certificate) to demonstrate it is who the
+  // client thinks it is; the client needs a certificate (FLAGS_ssl_client_ca_certificate)
+  // to validate that assertion from the server.
+  return !FLAGS_ssl_client_ca_certificate.empty() &&
+      !FLAGS_ssl_server_certificate.empty() && !FLAGS_ssl_private_key.empty();
+}
+
+bool IsExternalTlsConfigured() {
+  // If the ssl_server_certificate is set, then external TLS is configured, i.e. external
+  // clients can talk to Impala at least over unauthenticated TLS.
+  return !FLAGS_ssl_server_certificate.empty() && !FLAGS_ssl_private_key.empty();
+}
+
 // Callback used by OpenSSLErr() - write the error given to us through buf to the
 // stringstream that's passed in through ctx.
 static int OpenSSLErrCallback(const char* buf, size_t len, void* ctx) {

http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/util/openssl-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.h b/be/src/util/openssl-util.h
index 67d014d..7b1b28e 100644
--- a/be/src/util/openssl-util.h
+++ b/be/src/util/openssl-util.h
@@ -47,6 +47,14 @@ namespace impala {
 /// Returns the maximum supported TLS version available in the linked OpenSSL library.
 int MaxSupportedTlsVersion();
 
+/// Returns true if, per the process configuration flags, server<->server communications
+/// should use TLS.
+bool IsInternalTlsConfigured();
+
+/// Returns true if, per the process configuration flags, client<->server communications
+/// should use TLS.
+bool IsExternalTlsConfigured();
+
 /// Add entropy from the system RNG to OpenSSL's global RNG. Called at system startup
 /// and again periodically to add new entropy.
 void SeedOpenSSLRNG();
@@ -138,6 +146,7 @@ class EncryptionKey {
   /// Cipher Mode
   AES_CIPHER_MODE mode_;
 };
+
 }
 
 #endif