You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/02/02 18:51:32 UTC
[08/19] impala git commit: IMPALA-5054: [SECURITY] Enable KRPC w/ TLS
in Impala
IMPALA-5054: [SECURITY] Enable KRPC w/ TLS in Impala
KRPC has some flags that turn on TLS. This patch sets those to enable
TLS communication.
Tests are added to rpc-mgr-test.
TODO: Kudu kerberos testing is disabled. Will re-enable as part of IMPALA-6448.
Change-Id: I9a14a44fdea9ab668f3714eb69fdb188bce38f5a
Reviewed-on: http://gerrit.cloudera.org:8080/8439
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/68b7c8b8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/68b7c8b8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/68b7c8b8
Branch: refs/heads/2.x
Commit: 68b7c8b8aabd1a65f325f971dc861f6cb2eff5ad
Parents: 885776e
Author: Sailesh Mukil <sa...@apache.org>
Authored: Sun Oct 29 18:38:57 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:10:15 2018 +0000
----------------------------------------------------------------------
be/src/catalog/catalogd-main.cc | 9 +-
be/src/rpc/authentication-test.cc | 5 +-
be/src/rpc/rpc-mgr-test.cc | 274 +++++++++++++++++++++---
be/src/rpc/rpc-mgr.cc | 31 ++-
be/src/rpc/rpc-mgr.h | 6 +
be/src/rpc/thrift-server.cc | 14 --
be/src/rpc/thrift-server.h | 4 -
be/src/runtime/exec-env.cc | 3 +-
be/src/service/impala-server.cc | 9 +-
be/src/statestore/statestore-subscriber.cc | 5 +-
be/src/statestore/statestore.cc | 5 +-
be/src/statestore/statestored-main.cc | 3 +-
be/src/testutil/in-process-servers.cc | 3 +-
be/src/util/openssl-util.cc | 23 ++
be/src/util/openssl-util.h | 9 +
15 files changed, 332 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/catalog/catalogd-main.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index f98a406..95e5dcd 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -29,14 +29,15 @@
#include "rpc/thrift-server.h"
#include "runtime/mem-tracker.h"
#include "service/fe-support.h"
+#include "util/common-metrics.h"
#include "util/debug-util.h"
#include "util/jni-util.h"
+#include "util/default-path-handlers.h"
+#include "util/memory-metrics.h"
#include "util/metrics.h"
-#include "util/common-metrics.h"
#include "util/network-util.h"
-#include "util/memory-metrics.h"
+#include "util/openssl-util.h"
#include "util/webserver.h"
-#include "util/default-path-handlers.h"
DECLARE_string(classpath);
DECLARE_string(principal);
@@ -95,7 +96,7 @@ int CatalogdMain(int argc, char** argv) {
ThriftServer* server;
ThriftServerBuilder builder("CatalogService", processor, FLAGS_catalog_service_port);
- if (EnableInternalSslConnections()) {
+ if (IsInternalTlsConfigured()) {
SSLProtocol ssl_version;
ABORT_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/authentication-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication-test.cc b/be/src/rpc/authentication-test.cc
index 2b204ad..0dacdee 100644
--- a/be/src/rpc/authentication-test.cc
+++ b/be/src/rpc/authentication-test.cc
@@ -22,6 +22,7 @@
#include "rpc/thrift-server.h"
#include "util/auth-util.h"
#include "util/network-util.h"
+#include "util/openssl-util.h"
#include "util/thread.h"
#include <ldap.h>
@@ -32,6 +33,7 @@ DECLARE_string(keytab_file);
DECLARE_string(principal);
DECLARE_string(ssl_client_ca_certificate);
DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
DECLARE_string(internal_principals_whitelist);
// These are here so that we can grab them early in main() - the kerberos
@@ -172,7 +174,8 @@ TEST(Auth, KerbAndSslEnabled) {
ASSERT_OK(GetHostname(&hostname));
FLAGS_ssl_client_ca_certificate = "some_path";
FLAGS_ssl_server_certificate = "some_path";
- ASSERT_TRUE(EnableInternalSslConnections());
+ FLAGS_ssl_private_key = "some_path";
+ ASSERT_TRUE(IsInternalTlsConfigured());
SaslAuthProvider sa_internal(true);
ASSERT_OK(
sa_internal.InitKerberos("service_name/_HOST@some.realm", "/etc/hosts"));
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 441619b..7effda9 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -28,8 +28,10 @@
#include "rpc/auth-provider.h"
#include "testutil/gtest-util.h"
#include "testutil/mini-kdc-wrapper.h"
+#include "testutil/scoped-flag-setter.h"
#include "util/counting-barrier.h"
#include "util/network-util.h"
+#include "util/openssl-util.h"
#include "util/test-info.h"
#include "gen-cpp/rpc_test.proxy.h"
@@ -51,6 +53,11 @@ DECLARE_int32(num_reactor_threads);
DECLARE_int32(num_acceptor_threads);
DECLARE_string(hostname);
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
// The path of the current executable file that is required for passing into the SASL
// library as the 'application name'.
@@ -68,9 +75,41 @@ int GetServerPort() {
static int kdc_port = GetServerPort();
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+ Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+ Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+ Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+ Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+ Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
#define PAYLOAD_SIZE (4096)
template <class T> class RpcMgrTestBase : public T {
+ public:
+ // Utility function to initialize the parameter for ScanMem RPC.
+ // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+ // to 'controller'. Also sets up 'request' with the random value and index of the
+ // sidecar.
+ void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+ int32_t pattern = random();
+ for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+ int idx;
+ Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+ controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+ request->set_pattern(pattern);
+ request->set_sidecar_idx(idx);
+ }
+
protected:
TNetworkAddress krpc_address_;
RpcMgr rpc_mgr_;
@@ -86,20 +125,6 @@ template <class T> class RpcMgrTestBase : public T {
rpc_mgr_.Shutdown();
}
- // Utility function to initialize the parameter for ScanMem RPC.
- // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
- // to 'controller'. Also sets up 'request' with the random value and index of the
- // sidecar.
- void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
- int32_t pattern = random();
- for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
- int idx;
- Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
- controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
- request->set_pattern(pattern);
- request->set_sidecar_idx(idx);
- }
-
private:
int32_t payload_[PAYLOAD_SIZE];
};
@@ -191,35 +216,35 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
}
};
-// TODO: Disabled 'USE_KUDU_KERBEROS' and 'USE_IMPALA_KERBEROS' due to IMPALA-6268.
-// Reenable after fixing.
+// TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448.
+// Re-enable after fixing.
INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
RpcMgrKerberizedTest,
- ::testing::Values(KERBEROS_OFF,
- USE_KUDU_KERBEROS,
- USE_IMPALA_KERBEROS));
+ ::testing::Values(KERBEROS_OFF));
-TEST_P(RpcMgrKerberizedTest, MultipleServices) {
+template <class T>
+Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
+ RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
// Test that a service can be started, and will respond to requests.
unique_ptr<ServiceIf> ping_impl(
- new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
- ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(ping_impl)));
+ new PingServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+ RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl)));
// Test that a second service, that verifies the RPC payload is not corrupted,
// can be started.
unique_ptr<ServiceIf> scan_mem_impl(
- new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
- ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+ new ScanMemServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+ RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl)));
FLAGS_num_acceptor_threads = 2;
FLAGS_num_reactor_threads = 10;
- ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+ RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
unique_ptr<PingServiceProxy> ping_proxy;
- ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &ping_proxy));
+ RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
- ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
+ RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
RpcController controller;
srand(0);
@@ -230,17 +255,200 @@ TEST_P(RpcMgrKerberizedTest, MultipleServices) {
if (random() % 2 == 0) {
PingRequestPB request;
PingResponsePB response;
- kudu::Status status = ping_proxy->Ping(request, &response, &controller);
- ASSERT_TRUE(status.ok());
- ASSERT_EQ(response.int_response(), 42);
+ KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+ "unable to execute Ping() RPC.");
+ if (response.int_response() != 42) {
+ return Status(Substitute(
+ "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+ response.int_response()));
+ }
} else {
ScanMemRequestPB request;
ScanMemResponsePB response;
- SetupScanMemRequest(&request, &controller);
- kudu::Status status = scan_mem_proxy->ScanMem(request, &response, &controller);
- ASSERT_TRUE(status.ok());
+ test_base->SetupScanMemRequest(&request, &controller);
+ KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
+ "unable to execute ScanMem() RPC.");
}
}
+
+ return Status::OK();
+}
+
+
+TEST_F(RpcMgrTest, MultipleServices) {
+ ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+}
+
+TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+ // TODO: We're starting a seperate RpcMgr here instead of configuring
+ // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
+ // new gtest params to turn on TLS which needs to be a coordinated change across
+ // rpc-mgr-test and thrift-server-test.
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ // Enable TLS.
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+ ASSERT_OK(tls_rpc_mgr.Init());
+
+ ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+ tls_rpc_mgr.Shutdown();
+}
+
+// Test with a misconfigured TLS certificate and verify that an error is thrown.
+TEST_F(RpcMgrTest, BadCertificateTls) {
+
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, "unknown");
+
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+ tls_rpc_mgr.Shutdown();
+}
+
+// Test with a bad password command for the password protected private key.
+TEST_F(RpcMgrTest, BadPasswordTls) {
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(
+ &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+ auto password_cmd =
+ ScopedFlagSetter<string>::Make(
+ &FLAGS_ssl_private_key_password_cmd, "echo badpassword");
+
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+ tls_rpc_mgr.Shutdown();
+}
+
+// Test with a correct password command for the password protected private key.
+TEST_F(RpcMgrTest, CorrectPasswordTls) {
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(
+ &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+ auto password_cmd =
+ ScopedFlagSetter<string>::Make(
+ &FLAGS_ssl_private_key_password_cmd, "echo password");
+
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ ASSERT_OK(tls_rpc_mgr.Init());
+ ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+ tls_rpc_mgr.Shutdown();
+}
+
+// Test with a bad TLS cipher and verify that an error is thrown.
+TEST_F(RpcMgrTest, BadCiphersTls) {
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+ auto cipher =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, "not_a_cipher");
+
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+ tls_rpc_mgr.Shutdown();
+}
+
+// Test with a valid TLS cipher.
+TEST_F(RpcMgrTest, ValidCiphersTls) {
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+ auto cipher =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, TLS1_0_COMPATIBLE_CIPHER);
+
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ ASSERT_OK(tls_rpc_mgr.Init());
+ ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+ tls_rpc_mgr.Shutdown();
+}
+
+// Test with multiple valid TLS ciphers.
+TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
+ const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER,
+ TLS1_0_COMPATIBLE_CIPHER_2);
+ auto cert_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
+ auto pkey_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
+ auto ca_flag =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+ auto cipher =
+ ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, cipher_list);
+
+ RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+ TNetworkAddress tls_krpc_address;
+ IpAddr ip;
+ ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+ int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+ tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+ ASSERT_OK(tls_rpc_mgr.Init());
+ ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+ tls_rpc_mgr.Shutdown();
}
TEST_F(RpcMgrTest, SlowCallback) {
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index d4e8fe1..c70c117 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -27,6 +27,7 @@
#include "util/auth-util.h"
#include "util/cpu-info.h"
#include "util/network-util.h"
+#include "util/openssl-util.h"
#include "common/names.h"
@@ -43,6 +44,15 @@ using kudu::Sockaddr;
DECLARE_string(hostname);
DECLARE_string(principal);
DECLARE_string(be_principal);
+DECLARE_string(keytab_file);
+
+// Impala's TLS flags.
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+DECLARE_string(ssl_minimum_version);
// Defined in kudu/rpc/rpcz_store.cc
DECLARE_int32(rpc_duration_too_long_ms);
@@ -82,9 +92,23 @@ Status RpcMgr::Init() {
RETURN_IF_ERROR(ParseKerberosPrincipal(internal_principal, &service_name,
&unused_hostname, &unused_realm));
bld.set_sasl_proto_name(service_name);
- // TODO: Once the Messenger can take more options pertaining to 'rpc_authentication'
- // and more, we need to explicitly set those options here. (KUDU-2228)
+ bld.set_rpc_authentication("required");
+ bld.set_keytab_file(FLAGS_keytab_file);
+ }
+
+ if (use_tls_) {
+ LOG (INFO) << "Initing RpcMgr with TLS";
+ bld.set_epki_cert_key_files(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
+ bld.set_epki_certificate_authority_file(FLAGS_ssl_client_ca_certificate);
+ bld.set_epki_private_password_key_cmd(FLAGS_ssl_private_key_password_cmd);
+ if (!FLAGS_ssl_cipher_list.empty()) {
+ bld.set_rpc_tls_ciphers(FLAGS_ssl_cipher_list);
+ }
+ bld.set_rpc_tls_min_protocol(FLAGS_ssl_minimum_version);
+ bld.set_rpc_encryption("required");
+ bld.enable_inbound_tls();
}
+
KUDU_RETURN_IF_ERROR(bld.Build(&messenger_), "Could not build messenger");
return Status::OK();
}
@@ -97,8 +121,7 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
new ImpalaServicePool(std::move(service_ptr),
messenger_->metric_entity(), service_queue_depth);
// Start the thread pool first before registering the service in case the startup fails.
- RETURN_IF_ERROR(
- service_pool->Init(num_service_threads));
+ RETURN_IF_ERROR(service_pool->Init(num_service_threads));
KUDU_RETURN_IF_ERROR(
messenger_->RegisterService(service_pool->service_name(), service_pool),
"Could not register service");
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index 26dbae0..b2099f2 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -97,6 +97,8 @@ namespace impala {
/// port is configurable via FLAGS_acceptor_threads.
class RpcMgr {
public:
+ RpcMgr(bool use_tls = false) : use_tls_(use_tls) {}
+
/// Initializes the reactor threads, and prepares for sending outbound RPC requests.
Status Init() WARN_UNUSED_RESULT;
@@ -176,6 +178,10 @@ class RpcMgr {
/// True after StartServices() completes.
bool services_started_ = false;
+
+ /// True if TLS is configured for communication between Impala backends. messenger_ will
+ /// be configured to use TLS if this is set.
+ const bool use_tls_;
};
} // namespace impala
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 75ad424..ded710e 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -63,9 +63,6 @@ DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
DECLARE_string(principal);
DECLARE_string(keytab_file);
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_cipher_list);
namespace impala {
@@ -103,17 +100,6 @@ bool SSLProtoVersions::IsSupported(const SSLProtocol& protocol) {
}
}
-bool EnableInternalSslConnections() {
- // Enable SSL between servers only if both the client validation certificate and the
- // server certificate are specified. 'Client' here means clients that are used by Impala
- // services to contact other Impala services (as distinct from user clients of Impala
- // like the shell), and 'servers' are the processes that serve those clients. The server
- // needs a certificate to demonstrate it is who the client thinks it is; the client
- // needs a certificate to validate that assertion from the server.
- return !FLAGS_ssl_client_ca_certificate.empty() &&
- !FLAGS_ssl_server_certificate.empty();
-}
-
// Helper class that starts a server in a separate thread, and handles
// the inter-thread communication to monitor whether it started
// correctly.
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 588904f..d95d90e 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -341,10 +341,6 @@ struct SSLProtoVersions {
static bool IsSupported(const apache::thrift::transport::SSLProtocol& protocol);
};
-// Returns true if, per the process configuration flags, server<->server communications
-// should use SSL.
-bool EnableInternalSslConnections();
-
}
#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0e5b0f6..0551848 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -58,6 +58,7 @@
#include "util/memory-metrics.h"
#include "util/metrics.h"
#include "util/network-util.h"
+#include "util/openssl-util.h"
#include "util/parse-util.h"
#include "util/pretty-printer.h"
#include "util/thread-pool.h"
@@ -180,7 +181,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
VLOG_QUERY << "Using KRPC.";
// KRPC relies on resolved IP address. It's set in StartServices().
krpc_address_.__set_port(krpc_port);
- rpc_mgr_.reset(new RpcMgr());
+ rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
} else {
stream_mgr_.reset(new DataStreamMgr(metrics_.get()));
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index a62130c..af79180 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -67,6 +67,7 @@
#include "util/impalad-metrics.h"
#include "util/lineage-util.h"
#include "util/network-util.h"
+#include "util/openssl-util.h"
#include "util/parse-util.h"
#include "util/redactor.h"
#include "util/runtime-profile-counters.h"
@@ -2025,7 +2026,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
if (FLAGS_is_coordinator) exec_env_->frontend()->WaitForCatalog();
SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
- if (!FLAGS_ssl_server_certificate.empty() || EnableInternalSslConnections()) {
+ if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) {
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
}
@@ -2041,7 +2042,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
ThriftServerBuilder be_builder("backend", be_processor, thrift_be_port);
- if (EnableInternalSslConnections()) {
+ if (IsInternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for backend";
be_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
@@ -2067,7 +2068,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
beeswax_processor->setEventHandler(event_handler);
ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
- if (!FLAGS_ssl_server_certificate.empty()) {
+ if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for Beeswax";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
@@ -2094,7 +2095,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port,
ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
- if (!FLAGS_ssl_server_certificate.empty()) {
+ if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for HiveServer2";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 99da183..e58c177 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -32,8 +32,9 @@
#include "rpc/rpc-trace.h"
#include "rpc/thrift-util.h"
#include "statestore/statestore-service-client-wrapper.h"
-#include "util/time.h"
#include "util/debug-util.h"
+#include "util/openssl-util.h"
+#include "util/time.h"
#include "common/names.h"
@@ -197,7 +198,7 @@ Status StatestoreSubscriber::Start() {
ThriftServerBuilder builder(
"StatestoreSubscriber", processor, heartbeat_address_.port);
- if (EnableInternalSslConnections()) {
+ if (IsInternalTlsConfigured()) {
SSLProtocol ssl_version;
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index b135e38..0f72e58 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -29,6 +29,7 @@
#include "statestore/statestore-subscriber-client-wrapper.h"
#include "util/debug-util.h"
#include "util/logging-support.h"
+#include "util/openssl-util.h"
#include "util/time.h"
#include "util/uid-util.h"
#include "util/webserver.h"
@@ -225,11 +226,11 @@ Statestore::Statestore(MetricGroup* metrics)
update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_update_tcp_timeout_seconds * 1000,
FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
- EnableInternalSslConnections())),
+ IsInternalTlsConfigured())),
heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "",
- EnableInternalSslConnections())),
+ IsInternalTlsConfigured())),
thrift_iface_(new StatestoreThriftIf(this)),
failure_detector_(new MissedHeartbeatFailureDetector(
FLAGS_statestore_max_missed_heartbeats,
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 1a11237..633d449 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -31,6 +31,7 @@
#include "util/common-metrics.h"
#include "util/debug-util.h"
#include "util/metrics.h"
+#include "util/openssl-util.h"
#include "util/memory-metrics.h"
#include "util/webserver.h"
#include "util/default-path-handlers.h"
@@ -91,7 +92,7 @@ int StatestoredMain(int argc, char** argv) {
ThriftServer* server;
ThriftServerBuilder builder("StatestoreService", processor, FLAGS_state_store_port);
- if (EnableInternalSslConnections()) {
+ if (IsInternalTlsConfigured()) {
SSLProtocol ssl_version;
ABORT_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 4817d7f..7ff44a8 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -27,6 +27,7 @@
#include "util/webserver.h"
#include "util/default-path-handlers.h"
#include "util/metrics.h"
+#include "util/openssl-util.h"
#include "runtime/exec-env.h"
#include "service/impala-server.h"
@@ -149,7 +150,7 @@ Status InProcessStatestore::Start() {
new StatestoreServiceProcessor(statestore_->thrift_iface()));
ThriftServerBuilder builder("StatestoreService", processor, statestore_port_);
- if (EnableInternalSslConnections()) {
+ if (IsInternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for Statestore";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key);
}
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/util/openssl-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.cc b/be/src/util/openssl-util.cc
index 264a49a..69dc676 100644
--- a/be/src/util/openssl-util.cc
+++ b/be/src/util/openssl-util.cc
@@ -31,6 +31,11 @@
#include "common/names.h"
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_cipher_list);
+
namespace impala {
// Counter to track the number of encryption keys generated. Incremented before each key
@@ -47,6 +52,24 @@ int MaxSupportedTlsVersion() {
return SSLv23_method()->version;
}
+bool IsInternalTlsConfigured() {
+ // Enable SSL between servers only if both the client validation certificate and the
+ // server certificate are specified. 'Client' here means clients that are used by Impala
+ // services to contact other Impala services (as distinct from user clients of Impala
+ // like the shell), and 'servers' are the processes that serve those clients. The server
+ // needs a certificate (FLAGS_ssl_server_certificate) to demonstrate it is who the
+ // client thinks it is; the client needs a certificate (FLAGS_ssl_client_ca_certificate)
+ // to validate that assertion from the server.
+ return !FLAGS_ssl_client_ca_certificate.empty() &&
+ !FLAGS_ssl_server_certificate.empty() && !FLAGS_ssl_private_key.empty();
+}
+
+bool IsExternalTlsConfigured() {
+ // If the ssl_server_certificate is set, then external TLS is configured, i.e. external
+ // clients can talk to Impala at least over unauthenticated TLS.
+ return !FLAGS_ssl_server_certificate.empty() && !FLAGS_ssl_private_key.empty();
+}
+
// Callback used by OpenSSLErr() - write the error given to us through buf to the
// stringstream that's passed in through ctx.
static int OpenSSLErrCallback(const char* buf, size_t len, void* ctx) {
http://git-wip-us.apache.org/repos/asf/impala/blob/68b7c8b8/be/src/util/openssl-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.h b/be/src/util/openssl-util.h
index 67d014d..7b1b28e 100644
--- a/be/src/util/openssl-util.h
+++ b/be/src/util/openssl-util.h
@@ -47,6 +47,14 @@ namespace impala {
/// Returns the maximum supported TLS version available in the linked OpenSSL library.
int MaxSupportedTlsVersion();
+/// Returns true if, per the process configuration flags, server<->server communications
+/// should use TLS.
+bool IsInternalTlsConfigured();
+
+/// Returns true if, per the process configuration flags, client<->server communications
+/// should use TLS.
+bool IsExternalTlsConfigured();
+
/// Add entropy from the system RNG to OpenSSL's global RNG. Called at system startup
/// and again periodically to add new entropy.
void SeedOpenSSLRNG();
@@ -138,6 +146,7 @@ class EncryptionKey {
/// Cipher Mode
AES_CIPHER_MODE mode_;
};
+
}
#endif