You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/02/05 23:15:56 UTC

[2/3] impala git commit: IMPALA-6448: Re-enable kerberized testing with KRPC

IMPALA-6448: Re-enable kerberized testing with KRPC

For the patch for IMPALA-5054, we realized that we needed to make
the kudu::rpc::Messenger configurable. A patch was done on the Kudu
side which is tracked by KUDU-2228. As part of that patch, one of
the design decisions taken was to only allow kerberos either on or
off for the entirety of the process life. This means that we cannot
switch kerberos on and off in the same process any more with KRPC.
This behavior can be found in SaslInit() in kudu/rpc/sasl_common.cc
as SaslInit() which is called once per process will hard code some
configuration which cannot be toggled.

This affected our kerberized rpc-mgr-tests. This patch splits out
the kerberized part of rpc-mgr-test into rpc-mgr-kerberized-test.

It also puts the common code between both the files into
rpc-mgr-test-base.h

Change-Id: I6412978316de90875c98f8fbe51c8d215c227b18
Reviewed-on: http://gerrit.cloudera.org:8080/9164
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9ac9f7e3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9ac9f7e3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9ac9f7e3

Branch: refs/heads/master
Commit: 9ac9f7e3f8f327a3cade75548a85106190c8842c
Parents: 30c0375
Author: Sailesh Mukil <sa...@apache.org>
Authored: Tue Jan 30 21:35:28 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 5 22:49:07 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/CMakeLists.txt             |   7 +-
 be/src/rpc/rpc-mgr-kerberized-test.cc |  89 ++++++++
 be/src/rpc/rpc-mgr-test-base.h        | 269 +++++++++++++++++++++++
 be/src/rpc/rpc-mgr-test.cc            | 333 ++---------------------------
 4 files changed, 381 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 7beb80d..e4d96e2 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -50,7 +50,12 @@ ADD_BE_TEST(rpc-mgr-test)
 add_dependencies(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test security-test-for-impala)
-target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
+
+ADD_BE_TEST(rpc-mgr-kerberized-test)
+add_dependencies(rpc-mgr-kerberized-test rpc_test_proto)
+target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto)
+target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala)
+target_link_libraries(rpc-mgr-kerberized-test ${KRB5_REALM_OVERRIDE})
 
 add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
 add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)

http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
new file mode 100644
index 0000000..57ed3eb
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr-test-base.h"
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+
+namespace impala {
+
+static int kdc_port = GetServerPort();
+
+class RpcMgrKerberizedTest :
+    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    string spn = Substitute("impala-test/$0", ip);
+
+    kdc_wrapper_.reset(new MiniKdcWrapper(
+        std::move(spn), "KRBTEST.COM", "24h", "7d", kdc_port));
+    DCHECK(kdc_wrapper_.get() != nullptr);
+
+    ASSERT_OK(kdc_wrapper_->SetupAndStartMiniKDC(GetParam()));
+    ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
+
+    RpcMgrTestBase::SetUp();
+  }
+
+  virtual void TearDown() {
+    ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
+    RpcMgrTestBase::TearDown();
+  }
+
+ private:
+  boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
+};
+
+INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
+                        RpcMgrKerberizedTest,
+                        ::testing::Values(USE_KUDU_KERBEROS,
+                                          USE_IMPALA_KERBEROS));
+
+TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+  // TODO: We're starting a seperate RpcMgr here instead of configuring
+  // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
+  // new gtest params to turn on TLS which needs to be a coordinated change across
+  // rpc-mgr-test and thrift-server-test.
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  // Enable TLS.
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
+  ASSERT_OK(tls_rpc_mgr.Init());
+
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
+}
+
+} // namespace impala
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+
+  // Fill in the path of the current binary for use by the tests.
+  CURRENT_EXECUTABLE_PATH = argv[0];
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
new file mode 100644
index 0000000..43b6d83
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "rpc/auth-provider.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/gtest-util.h"
+#include "testutil/mini-kdc-wrapper.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/counting-barrier.h"
+#include "util/network-util.h"
+#include "util/openssl-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+
+#include "common/names.h"
+
+using kudu::rpc::ServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+
+// The path of the current executable file that is required for passing into the SASL
+// library as the 'application name'.
+static string CURRENT_EXECUTABLE_PATH;
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
+
+int GetServerPort() {
+  int port = FindUnusedEphemeralPort(nullptr);
+  EXPECT_FALSE(port == -1);
+  return port;
+}
+
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+/// Use this class to set the appropriate required TLS flags for the duration of the
+/// lifetime of the object.
+/// It is assumed that the flags always hold empty values by default.
+class ScopedSetTlsFlags {
+ public:
+  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
+      const string& pkey_passwd = "", const string& ciphers = "") {
+    FLAGS_ssl_server_certificate = cert;
+    FLAGS_ssl_private_key = pkey;
+    FLAGS_ssl_client_ca_certificate = ca_cert;
+    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
+    FLAGS_ssl_cipher_list = ciphers;
+  }
+
+  ~ScopedSetTlsFlags() {
+    FLAGS_ssl_server_certificate = "";
+    FLAGS_ssl_private_key = "";
+    FLAGS_ssl_client_ca_certificate = "";
+    FLAGS_ssl_private_key_password_cmd = "";
+    FLAGS_ssl_cipher_list = "";
+  }
+};
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
+#define PAYLOAD_SIZE (4096)
+
+template <class T> class RpcMgrTestBase : public T {
+ public:
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
+  MemTracker* service_tracker() { return &service_tracker_; }
+
+ protected:
+  TNetworkAddress krpc_address_;
+  RpcMgr rpc_mgr_;
+
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    ASSERT_OK(rpc_mgr_.Init());
+  }
+
+  virtual void TearDown() {
+    rpc_mgr_.Shutdown();
+  }
+
+ private:
+  int32_t payload_[PAYLOAD_SIZE];
+  MemTracker service_tracker_;
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
+  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
+      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
+
+  virtual void Ping(
+      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
+    response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
+    cb_(context);
+  }
+
+ private:
+  MemTracker* mem_tracker_;
+  ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
+  }
+
+  // The request comes with an int 'pattern' and a payload of int array sent with
+  // sidecar. Scan the array to make sure every element matches 'pattern'.
+  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
+      RpcContext* context) {
+    int32_t pattern = request->pattern();
+    Slice payload;
+    ASSERT_OK(
+        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
+    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+      int32_t val = v[i];
+      if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release the memory.
+        mem_tracker_->Release(context->GetTransferSize());
+        context->RespondFailure(kudu::Status::Corruption(
+            Substitute("Expecting $1; Found $2", pattern, val)));
+        return;
+      }
+    }
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
+    context->RespondSuccess();
+  }
+
+ private:
+  MemTracker* mem_tracker_;
+
+};
+
+template <class T>
+Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
+    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  MemTracker* mem_tracker = test_base->service_tracker();
+  // Test that a service can be started, and will respond to requests.
+  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
+
+  // Test that a second service, that verifies the RPC payload is not corrupted,
+  // can be started.
+  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
+
+  unique_ptr<PingServiceProxy> ping_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
+
+  RpcController controller;
+  srand(0);
+  // Randomly invoke either services to make sure a RpcMgr can host multiple
+  // services at the same time.
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    if (random() % 2 == 0) {
+      PingRequestPB request;
+      PingResponsePB response;
+      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+          "unable to execute Ping() RPC.");
+      if (response.int_response() != 42) {
+          return Status(Substitute(
+              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                  response.int_response()));
+      }
+    } else {
+      ScanMemRequestPB request;
+      ScanMemResponsePB response;
+      test_base->SetupScanMemRequest(&request, &controller);
+      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
+          "unable to execute ScanMem() RPC.");
+    }
+  }
+
+  return Status::OK();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index c525148..4c4b100 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -15,124 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr.inline.h"
-
-#include "common/init.h"
-#include "exec/kudu-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/rpc_sidecar.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "rpc/auth-provider.h"
-#include "runtime/mem-tracker.h"
-#include "testutil/gtest-util.h"
-#include "testutil/mini-kdc-wrapper.h"
-#include "testutil/scoped-flag-setter.h"
-#include "util/counting-barrier.h"
-#include "util/network-util.h"
-#include "util/openssl-util.h"
-#include "util/test-info.h"
-
-#include "gen-cpp/rpc_test.proxy.h"
-#include "gen-cpp/rpc_test.service.h"
-
-#include "common/names.h"
-
-using kudu::rpc::ErrorStatusPB;
+#include "rpc/rpc-mgr-test-base.h"
+
 using kudu::rpc::ServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
-using kudu::rpc::RpcSidecar;
 using kudu::MonoDelta;
-using kudu::Slice;
-
-using namespace std;
 
 DECLARE_int32(num_reactor_threads);
 DECLARE_int32(num_acceptor_threads);
 DECLARE_string(hostname);
 
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-
-// The path of the current executable file that is required for passing into the SASL
-// library as the 'application name'.
-static string CURRENT_EXECUTABLE_PATH;
-
 namespace impala {
 
-static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
-
-int GetServerPort() {
-  int port = FindUnusedEphemeralPort(nullptr);
-  EXPECT_FALSE(port == -1);
-  return port;
-}
-
-static int kdc_port = GetServerPort();
-
-const static string IMPALA_HOME(getenv("IMPALA_HOME"));
-const string& SERVER_CERT =
-    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
-const string& PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
-const string& BAD_SERVER_CERT =
-    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
-const string& BAD_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
-const string& PASSWORD_PROTECTED_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
-
-// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
-// support.
-const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
-const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
-
-#define PAYLOAD_SIZE (4096)
-
-template <class T> class RpcMgrTestBase : public T {
- public:
-  // Utility function to initialize the parameter for ScanMem RPC.
-  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
-  // to 'controller'. Also sets up 'request' with the random value and index of the
-  // sidecar.
-  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
-    int32_t pattern = random();
-    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
-    int idx;
-    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
-    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
-    request->set_pattern(pattern);
-    request->set_sidecar_idx(idx);
-  }
-
-  MemTracker* service_tracker() { return &service_tracker_; }
-
- protected:
-  TNetworkAddress krpc_address_;
-  RpcMgr rpc_mgr_;
-
-  virtual void SetUp() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
-    ASSERT_OK(rpc_mgr_.Init());
-  }
-
-  virtual void TearDown() {
-    rpc_mgr_.Shutdown();
-  }
-
- private:
-  int32_t payload_[PAYLOAD_SIZE];
-  MemTracker service_tracker_;
-};
-
 // For tests that do not require kerberized testing, we use RpcTest.
 class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
   virtual void SetUp() {
@@ -144,157 +39,7 @@ class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
   }
 };
 
-class RpcMgrKerberizedTest :
-    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-  virtual void SetUp() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    string spn = Substitute("impala-test/$0", ip);
-
-    kdc_wrapper_.reset(new MiniKdcWrapper(
-        std::move(spn), "KRBTEST.COM", "24h", "7d", kdc_port));
-    DCHECK(kdc_wrapper_.get() != nullptr);
-
-    ASSERT_OK(kdc_wrapper_->SetupAndStartMiniKDC(GetParam()));
-    ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
-
-    RpcMgrTestBase::SetUp();
-  }
-
-  virtual void TearDown() {
-    ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
-    RpcMgrTestBase::TearDown();
-  }
-
- private:
-  boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
-};
-
-typedef std::function<void(RpcContext*)> ServiceCB;
-
-class PingServiceImpl : public PingServiceIf {
- public:
-  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
-  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
-      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
-
-  virtual void Ping(
-      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
-    response->set_int_response(42);
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_->Release(context->GetTransferSize());
-    cb_(context);
-  }
-
- private:
-  MemTracker* mem_tracker_;
-  ServiceCB cb_;
-};
-
-class ScanMemServiceImpl : public ScanMemServiceIf {
- public:
-  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
-  }
-
-  // The request comes with an int 'pattern' and a payload of int array sent with
-  // sidecar. Scan the array to make sure every element matches 'pattern'.
-  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
-      RpcContext* context) {
-    int32_t pattern = request->pattern();
-    Slice payload;
-    ASSERT_OK(
-        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
-    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
-
-    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
-    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
-      int32_t val = v[i];
-      if (val != pattern) {
-        // Incoming requests will already be tracked and we need to release the memory.
-        mem_tracker_->Release(context->GetTransferSize());
-        context->RespondFailure(kudu::Status::Corruption(
-            Substitute("Expecting $1; Found $2", pattern, val)));
-        return;
-      }
-    }
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_->Release(context->GetTransferSize());
-    context->RespondSuccess();
-  }
-
- private:
-  MemTracker* mem_tracker_;
-};
-
-// TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448.
-// Re-enable after fixing.
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
-                        RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_OFF));
-
-template <class T>
-Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
-    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  MemTracker* mem_tracker = test_base->service_tracker();
-  // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
-
-  // Test that a second service, that verifies the RPC payload is not corrupted,
-  // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
-
-  FLAGS_num_acceptor_threads = 2;
-  FLAGS_num_reactor_threads = 10;
-  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
-
-  unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
-
-  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
-
-  RpcController controller;
-  srand(0);
-  // Randomly invoke either services to make sure a RpcMgr can host multiple
-  // services at the same time.
-  for (int i = 0; i < 100; ++i) {
-    controller.Reset();
-    if (random() % 2 == 0) {
-      PingRequestPB request;
-      PingResponsePB response;
-      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
-          "unable to execute Ping() RPC.");
-      if (response.int_response() != 42) {
-          return Status(Substitute(
-              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
-                  response.int_response()));
-      }
-    } else {
-      ScanMemRequestPB request;
-      ScanMemResponsePB response;
-      test_base->SetupScanMemRequest(&request, &controller);
-      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
-          "unable to execute ScanMem() RPC.");
-    }
-  }
-
-  return Status::OK();
-}
-
-
-TEST_F(RpcMgrTest, MultipleServices) {
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
-}
-
-TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+TEST_F(RpcMgrTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
   // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
   // new gtest params to turn on TLS which needs to be a coordinated change across
@@ -307,28 +52,20 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  // Enable TLS.
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
   ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
+TEST_F(RpcMgrTest, MultipleServices) {
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+}
+
 // Test with a misconfigured TLS certificate and verify that an error is thrown.
 TEST_F(RpcMgrTest, BadCertificateTls) {
-
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, "unknown");
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, "unknown");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -344,16 +81,8 @@ TEST_F(RpcMgrTest, BadCertificateTls) {
 
 // Test with a bad password command for the password protected private key.
 TEST_F(RpcMgrTest, BadPasswordTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto password_cmd =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key_password_cmd, "echo badpassword");
+  ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
+      "echo badpassword");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -369,16 +98,8 @@ TEST_F(RpcMgrTest, BadPasswordTls) {
 
 // Test with a correct password command for the password protected private key.
 TEST_F(RpcMgrTest, CorrectPasswordTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto password_cmd =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key_password_cmd, "echo password");
+  ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
+      "echo password");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -395,14 +116,7 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) {
 
 // Test with a bad TLS cipher and verify that an error is thrown.
 TEST_F(RpcMgrTest, BadCiphersTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, "not_a_cipher");
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", "not_a_cipher");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -418,14 +132,8 @@ TEST_F(RpcMgrTest, BadCiphersTls) {
 
 // Test with a valid TLS cipher.
 TEST_F(RpcMgrTest, ValidCiphersTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, TLS1_0_COMPATIBLE_CIPHER);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "",
+      TLS1_0_COMPATIBLE_CIPHER);
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -444,14 +152,7 @@ TEST_F(RpcMgrTest, ValidCiphersTls) {
 TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER,
       TLS1_0_COMPATIBLE_CIPHER_2);
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, cipher_list);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", cipher_list);
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;