You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/02/05 23:15:55 UTC

[1/3] impala git commit: IMPALA-6449: Use CLOCK_MONOTONIC in ConditionVariable

Repository: impala
Updated Branches:
  refs/heads/master c856b30e3 -> 9b68645f9


IMPALA-6449: Use CLOCK_MONOTONIC in ConditionVariable

ConditionVariable is a thin wrapper around pthread_cond_*.
Currently, pthread_cond_timedwait() uses the default attribute
CLOCK_REALTIME. This is susceptible to adjustment to the system
clock from various sources such as NTP and time may go backward.
This change fixes the problem by switching to using CLOCK_MONOTONIC
so time will be monotonic although the frequency of the clock ticks
may still be adjusted by NTP. Ideally, we should use CLOCK_MONOTONIC_RAW
but it's available only on Linux kernel 2.6.28 or latter. This change
also get rids of some usage of boost::get_system_time() which suffers
from the same problem.

Change-Id: I81611cfd5e7c5347203fe7fa6b0f615602257f87
Reviewed-on: http://gerrit.cloudera.org:8080/9158
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30c0375e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30c0375e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30c0375e

Branch: refs/heads/master
Commit: 30c0375ed358f8040d28fe756a17c6e3965177b1
Parents: c856b30
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Jan 29 18:07:28 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 5 22:22:45 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/thrift-server.cc               |  4 +--
 be/src/runtime/fragment-instance-state.cc |  7 ++---
 be/src/service/impala-server.cc           |  2 +-
 be/src/util/blocking-queue.h              |  6 ++--
 be/src/util/condition-variable.h          | 40 +++++++++++++-------------
 be/src/util/promise.h                     |  8 ++----
 be/src/util/time.h                        | 13 +++++++++
 7 files changed, 44 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index ded710e..48fb1b9 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -165,8 +165,8 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
       &ThriftServer::ThriftServerEventProcessor::Supervise, this,
       &thrift_server_->server_thread_));
 
-  system_time deadline = get_system_time() +
-      posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
+  timespec deadline;
+  TimeFromNowMillis(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS, &deadline);
 
   // Loop protects against spurious wakeup. Locks provide necessary fences to ensure
   // visibility.

http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 16b4a7e..ad9e99e 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -341,19 +341,16 @@ void FragmentInstanceState::ReportProfileThread() {
   // updates at once so its better for contention as well as smoother progress
   // reporting.
   int report_fragment_offset = rand() % FLAGS_status_report_interval;
-  boost::posix_time::seconds wait_duration(report_fragment_offset);
   // We don't want to wait longer than it takes to run the entire fragment.
-  stop_report_thread_cv_.WaitFor(l, wait_duration);
+  stop_report_thread_cv_.WaitFor(l, report_fragment_offset * MICROS_PER_SEC);
 
   while (report_thread_active_) {
-    boost::posix_time::seconds loop_wait_duration(FLAGS_status_report_interval);
-
     // timed_wait can return because the timeout occurred or the condition variable
     // was signaled.  We can't rely on its return value to distinguish between the
     // two cases (e.g. there is a race here where the wait timed out but before grabbing
     // the lock, the condition variable was signaled).  Instead, we will use an external
     // flag, report_thread_active_, to coordinate this.
-    stop_report_thread_cv_.WaitFor(l, loop_wait_duration);
+    stop_report_thread_cv_.WaitFor(l, FLAGS_status_report_interval * MICROS_PER_SEC);
 
     if (!report_thread_active_) break;
     SendReport(false, Status::OK());

http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 0c5f75b..cf5f5fb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1772,7 +1772,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
         session_timeout_cv_.Wait(timeout_lock);
       } else {
         // Sleep for a second before checking whether an active session can be expired.
-        session_timeout_cv_.WaitFor(timeout_lock, seconds(1));
+        session_timeout_cv_.WaitFor(timeout_lock, MICROS_PER_SEC);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/util/blocking-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index a4b1b8f..1dd90d5 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -138,13 +138,13 @@ class BlockingQueue : public CacheLineAligned {
   bool BlockingPutWithTimeout(V&& val, int64_t timeout_micros) {
     MonotonicStopWatch timer;
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
-    boost::system_time wtime = boost::get_system_time() +
-        boost::posix_time::microseconds(timeout_micros);
+    timespec abs_time;
+    TimeFromNowMicros(timeout_micros, &abs_time);
     bool notified = true;
     while (SizeLocked(write_lock) >= max_elements_ && !shutdown_ && notified) {
       timer.Start();
       // Wait until we're notified or until the timeout expires.
-      notified = put_cv_.WaitUntil(write_lock, wtime);
+      notified = put_cv_.WaitUntil(write_lock, abs_time);
       timer.Stop();
     }
     total_put_wait_time_ += timer.ElapsedTime();

http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/util/condition-variable.h
----------------------------------------------------------------------
diff --git a/be/src/util/condition-variable.h b/be/src/util/condition-variable.h
index c1a1e56..e463790 100644
--- a/be/src/util/condition-variable.h
+++ b/be/src/util/condition-variable.h
@@ -24,13 +24,23 @@
 #include <pthread.h>
 #include <unistd.h>
 
+#include "util/time.h"
+
 namespace impala {
 
 /// Simple wrapper around POSIX pthread condition variable. This has lower overhead than
 /// boost's implementation as it doesn't implement boost thread interruption.
 class ConditionVariable {
  public:
-  ConditionVariable() { pthread_cond_init(&cv_, NULL); }
+  ConditionVariable() {
+    pthread_condattr_t attrs;
+    int retval = pthread_condattr_init(&attrs);
+    DCHECK_EQ(0, retval);
+    pthread_condattr_setclock(&attrs, CLOCK_MONOTONIC);
+    retval = pthread_cond_init(&cv_, &attrs);
+    DCHECK_EQ(0, retval);
+    pthread_condattr_destroy(&attrs);
+  }
 
   ~ConditionVariable() { pthread_cond_destroy(&cv_); }
 
@@ -41,32 +51,22 @@ class ConditionVariable {
     pthread_cond_wait(&cv_, mutex);
   }
 
-  /// Wait until the condition variable is notified or 'timeout' has passed.
+  /// Wait until the condition variable is notified or 'abs_time' has passed.
   /// Returns true if the condition variable is notified before the absolute timeout
-  /// specified in 'timeout' has passed. Returns false otherwise.
-  bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
-      const timespec& abs_time) {
+  /// specified in 'abs_time' has passed. Returns false otherwise.
+  bool WaitUntil(boost::unique_lock<boost::mutex>& lock, const timespec& abs_time) {
     DCHECK(lock.owns_lock());
     pthread_mutex_t* mutex = lock.mutex()->native_handle();
     return pthread_cond_timedwait(&cv_, mutex, &abs_time) == 0;
   }
 
-  /// Wait until the condition variable is notified or 'abs_time' has passed.
-  /// Returns true if the condition variable is notified before the absolute timeout
-  /// specified in 'abs_time' has passed. Returns false otherwise.
-  bool WaitUntil(boost::unique_lock<boost::mutex>& lock,
-      const boost::system_time& abs_time) {
-    return WaitUntil(lock, to_timespec(abs_time));
-  }
-
-  /// Wait until the condition variable is notified or have waited for the time
-  /// specified in 'wait_duration'.
-  /// Returns true if the condition variable is notified in time.
+  /// Wait until the condition variable is notified or 'duration_us' microseconds
+  /// have passed. Returns true if the condition variable is notified in time.
   /// Returns false otherwise.
-  template <typename duration_type>
-  bool WaitFor(boost::unique_lock<boost::mutex>& lock,
-      const duration_type& wait_duration) {
-    return WaitUntil(lock, to_timespec(boost::get_system_time() + wait_duration));
+  bool WaitFor(boost::unique_lock<boost::mutex>& lock, int64_t duration_us) {
+    timespec deadline;
+    TimeFromNowMicros(duration_us, &deadline);
+    return WaitUntil(lock, deadline);
   }
 
   /// Notify a single waiter on this condition variable.

http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/util/promise.h b/be/src/util/promise.h
index 5de2d13..c93d9f2 100644
--- a/be/src/util/promise.h
+++ b/be/src/util/promise.h
@@ -77,17 +77,15 @@ class Promise {
   /// timed_out: Indicates whether Get() returned due to timeout. Must be non-NULL.
   const T& Get(int64_t timeout_millis, bool* timed_out) {
     DCHECK_GT(timeout_millis, 0);
-    int64_t timeout_micros = timeout_millis * 1000;
+    int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI;
     DCHECK(timed_out != NULL);
     boost::unique_lock<boost::mutex> l(val_lock_);
     int64_t start;
     int64_t now;
     now = start = MonotonicMicros();
     while (!val_is_set_ && (now - start) < timeout_micros) {
-      boost::posix_time::microseconds wait_time =
-          boost::posix_time::microseconds(std::max<int64_t>(
-              1, timeout_micros - (now - start)));
-      val_set_cond_.WaitFor(l, wait_time);
+      int64_t wait_time_micros = std::max<int64_t>(1, timeout_micros - (now - start));
+      val_set_cond_.WaitFor(l, wait_time_micros);
       now = MonotonicMicros();
     }
     *timed_out = !val_is_set_;

http://git-wip-us.apache.org/repos/asf/impala/blob/30c0375e/be/src/util/time.h
----------------------------------------------------------------------
diff --git a/be/src/util/time.h b/be/src/util/time.h
index cef14c8..64dbf9c 100644
--- a/be/src/util/time.h
+++ b/be/src/util/time.h
@@ -57,6 +57,19 @@ inline int64_t UnixMillis() {
   return GetCurrentTimeMicros() / MICROS_PER_MILLI;
 }
 
+/// Return the time 'time_us' microseconds away from now in 'abs_time'.
+inline void TimeFromNowMicros(int64_t time_us, timespec* abs_time) {
+  clock_gettime(CLOCK_MONOTONIC, abs_time);
+  abs_time->tv_nsec += (time_us % MICROS_PER_SEC) * NANOS_PER_MICRO;
+  abs_time->tv_sec += time_us / MICROS_PER_SEC + abs_time->tv_nsec / NANOS_PER_SEC;
+  abs_time->tv_nsec %= NANOS_PER_SEC;
+}
+
+/// Return the time 'time_ms' milliseconds away from now in 'abs_time'.
+inline void TimeFromNowMillis(int64_t time_ms, timespec* abs_time) {
+  TimeFromNowMicros(time_ms * MICROS_PER_MILLI, abs_time);
+}
+
 /// Returns the number of microseconds that have passed since the Unix epoch. This is
 /// affected by manual changes to the system clock but is more suitable for use across
 /// a cluster. For more accurate timings on the local host use the monotonic functions


[3/3] impala git commit: IMPALA-6219: Use AES-GCM for spill-to-disk encryption

Posted by sa...@apache.org.
IMPALA-6219: Use AES-GCM for spill-to-disk encryption

AES-GCM can be very fast(~10 times faster than CFB+SHA256), but it
requires an instruction that Impala can currently run without (CLMUL).
In order to be fast, we dispatch to GCM mode at run-time based on the
CPU and OpenSSL version.

Testing:
run runtime tmp-file-mgr-test, openssl-util-test, buffer-pool-test
and buffered-tuple-stream-test.
add two cases GcmIntegrity & EncryptoArbitraryLength for
openssl-util-test

Change-Id: I1ea87b82a8897ee8bfa187715ac1c52883790d24
Reviewed-on: http://gerrit.cloudera.org:8080/9032
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9b68645f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9b68645f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9b68645f

Branch: refs/heads/master
Commit: 9b68645f9eb9e08899fda860e0946cc05f205479
Parents: 9ac9f7e
Author: Xianda Ke <ke...@gmail.com>
Authored: Tue Jan 16 16:23:28 2018 +0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 5 22:58:32 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/tmp-file-mgr.cc   | 15 ++++--
 be/src/util/cpu-info.cc          | 13 ++---
 be/src/util/cpu-info.h           | 13 ++---
 be/src/util/openssl-util-test.cc | 95 +++++++++++++++++++++++-----------
 be/src/util/openssl-util.cc      | 96 ++++++++++++++++++++++++++++++++---
 be/src/util/openssl-util.h       | 70 +++++++++++++++++--------
 6 files changed, 228 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9b68645f/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index d35d302..3807670 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -612,19 +612,26 @@ void TmpFileMgr::WriteHandle::WaitForWrite() {
 Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) {
   DCHECK(FLAGS_disk_spill_encryption);
   SCOPED_TIMER(encryption_timer_);
-  // Since we're using AES-CTR/AES-CFB mode, we must take care not to reuse a
+  // Since we're using GCM/CTR/CFB mode, we must take care not to reuse a
   // key/IV pair. Regenerate a new key and IV for every data buffer we write.
   key_.InitializeRandom();
   RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data()));
-  hash_.Compute(buffer.data(), buffer.len());
+
+  if (!key_.IsGcmMode()) {
+    hash_.Compute(buffer.data(), buffer.len());
+  }
   return Status::OK();
 }
 
 Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) {
   DCHECK(FLAGS_disk_spill_encryption);
   SCOPED_TIMER(encryption_timer_);
-  if (!hash_.Verify(buffer.data(), buffer.len())) {
-    return Status("Block verification failure");
+
+  // GCM mode will verify the integrity by itself
+  if (!key_.IsGcmMode()) {
+    if (!hash_.Verify(buffer.data(), buffer.len())) {
+      return Status("Block verification failure");
+    }
   }
   return key_.Decrypt(buffer.data(), buffer.len(), buffer.data());
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/9b68645f/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index a32571e..1e3fcde 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -85,12 +85,13 @@ static struct {
   int64_t flag;
 } flag_mappings[] =
 {
-  { "ssse3",  CpuInfo::SSSE3 },
-  { "sse4_1", CpuInfo::SSE4_1 },
-  { "sse4_2", CpuInfo::SSE4_2 },
-  { "popcnt", CpuInfo::POPCNT },
-  { "avx",    CpuInfo::AVX },
-  { "avx2",   CpuInfo::AVX2 },
+  { "ssse3",     CpuInfo::SSSE3 },
+  { "sse4_1",    CpuInfo::SSE4_1 },
+  { "sse4_2",    CpuInfo::SSE4_2 },
+  { "popcnt",    CpuInfo::POPCNT },
+  { "avx",       CpuInfo::AVX },
+  { "avx2",      CpuInfo::AVX2 },
+  { "pclmuldqd", CpuInfo::PCLMULQDQ }
 };
 static const long num_flags = sizeof(flag_mappings) / sizeof(flag_mappings[0]);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9b68645f/be/src/util/cpu-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h
index 38d6782..e60babc 100644
--- a/be/src/util/cpu-info.h
+++ b/be/src/util/cpu-info.h
@@ -34,12 +34,13 @@ namespace impala {
 /// /sys/devices)
 class CpuInfo {
  public:
-  static const int64_t SSSE3   = (1 << 1);
-  static const int64_t SSE4_1  = (1 << 2);
-  static const int64_t SSE4_2  = (1 << 3);
-  static const int64_t POPCNT  = (1 << 4);
-  static const int64_t AVX     = (1 << 5);
-  static const int64_t AVX2    = (1 << 6);
+  static const int64_t SSSE3     = (1 << 1);
+  static const int64_t SSE4_1    = (1 << 2);
+  static const int64_t SSE4_2    = (1 << 3);
+  static const int64_t POPCNT    = (1 << 4);
+  static const int64_t AVX       = (1 << 5);
+  static const int64_t AVX2      = (1 << 6);
+  static const int64_t PCLMULQDQ = (1 << 7);
 
   /// Cache enums for L1 (data), L2 and L3
   enum CacheLevel {

http://git-wip-us.apache.org/repos/asf/impala/blob/9b68645f/be/src/util/openssl-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util-test.cc b/be/src/util/openssl-util-test.cc
index 8d98b0d..76f65a5 100644
--- a/be/src/util/openssl-util-test.cc
+++ b/be/src/util/openssl-util-test.cc
@@ -44,6 +44,41 @@ class OpenSSLUtilTest : public ::testing::Test {
     }
   }
 
+  /// Fill arbitrary-length buffer with random bytes
+  void GenerateRandomBytes(uint8_t* data, int64_t len) {
+    DCHECK_GE(len, 0);
+    for (int64_t i = 0; i < len; i++) {
+      data[i] = uniform_int_distribution<uint8_t>(0, UINT8_MAX)(rng_);
+    }
+  }
+
+  void TestEncryptionDecryption(const int64_t buffer_size) {
+    vector<uint8_t> original(buffer_size);
+    vector<uint8_t> scratch(buffer_size); // Scratch buffer for in-place encryption.
+    if (buffer_size % 8 == 0) {
+      GenerateRandomData(original.data(), buffer_size);
+    } else {
+      GenerateRandomBytes(original.data(), buffer_size);
+    }
+
+    // Check all the modes
+    AES_CIPHER_MODE modes[] = {AES_256_GCM, AES_256_CTR, AES_256_CFB};
+    for (auto m : modes) {
+      memcpy(scratch.data(), original.data(), buffer_size);
+
+      EncryptionKey key;
+      key.InitializeRandom();
+      key.SetCipherMode(m);
+
+      ASSERT_OK(key.Encrypt(scratch.data(), buffer_size, scratch.data()));
+      // Check that encryption did something
+      ASSERT_NE(0, memcmp(original.data(), scratch.data(), buffer_size));
+      ASSERT_OK(key.Decrypt(scratch.data(), buffer_size, scratch.data()));
+      // Check that we get the original data back.
+      ASSERT_EQ(0, memcmp(original.data(), scratch.data(), buffer_size));
+    }
+  }
+
   mt19937_64 rng_;
 };
 
@@ -57,7 +92,7 @@ TEST_F(OpenSSLUtilTest, Encryption) {
   GenerateRandomData(original.data(), buffer_size);
 
   // Check both CTR & CFB
-  AES_CIPHER_MODE modes[] = {AES_256_CTR, AES_256_CFB};
+  AES_CIPHER_MODE modes[] = {AES_256_GCM, AES_256_CTR, AES_256_CFB};
   for (auto m : modes) {
     // Iterate multiple times to ensure that key regeneration works correctly.
     EncryptionKey key;
@@ -85,44 +120,42 @@ TEST_F(OpenSSLUtilTest, Encryption) {
 /// Test that encryption and decryption work in-place.
 TEST_F(OpenSSLUtilTest, EncryptInPlace) {
   const int buffer_size = 1024 * 1024;
-  vector<uint8_t> original(buffer_size);
-  vector<uint8_t> scratch(buffer_size); // Scratch buffer for in-place encryption.
-
-  EncryptionKey key;
-  // Check both CTR & CFB
-  AES_CIPHER_MODE modes[] = {AES_256_CTR, AES_256_CFB};
-  for (auto m : modes) {
-    GenerateRandomData(original.data(), buffer_size);
-    memcpy(scratch.data(), original.data(), buffer_size);
-
-    key.InitializeRandom();
-    key.SetCipherMode(m);
-
-    ASSERT_OK(key.Encrypt(scratch.data(), buffer_size, scratch.data()));
-    // Check that encryption did something
-    ASSERT_NE(0, memcmp(original.data(), scratch.data(), buffer_size));
-    ASSERT_OK(key.Decrypt(scratch.data(), buffer_size, scratch.data()));
-    // Check that we get the original data back.
-    ASSERT_EQ(0, memcmp(original.data(), scratch.data(), buffer_size));
-  }
+  TestEncryptionDecryption(buffer_size);
 }
 
 /// Test that encryption works with buffer lengths that don't fit in a 32-bit integer.
 TEST_F(OpenSSLUtilTest, EncryptInPlaceHugeBuffer) {
   const int64_t buffer_size = 3 * 1024L * 1024L * 1024L;
-  vector<uint8_t> original(buffer_size);
-  vector<uint8_t> scratch(buffer_size); // Scratch buffer for in-place encryption.
-  GenerateRandomData(original.data(), buffer_size);
-  memcpy(scratch.data(), original.data(), buffer_size);
+  TestEncryptionDecryption(buffer_size);
+}
+
+/// Test that encryption works with arbitrary-length buffer
+TEST_F(OpenSSLUtilTest, EncryptArbitraryLength) {
+  std::uniform_int_distribution<uint64_t> dis(0, 1024 * 1024);
+  const int buffer_size = dis(rng_);
+  TestEncryptionDecryption(buffer_size);
+}
+
+/// Test integrity in GCM mode
+TEST_F(OpenSSLUtilTest, GcmIntegrity) {
+  const int buffer_size = 1024 * 1024;
+  vector<uint8_t> buffer(buffer_size);
 
   EncryptionKey key;
   key.InitializeRandom();
-  ASSERT_OK(key.Encrypt(scratch.data(), buffer_size, scratch.data()));
-  // Check that encryption did something
-  ASSERT_NE(0, memcmp(original.data(), scratch.data(), buffer_size));
-  ASSERT_OK(key.Decrypt(scratch.data(), buffer_size, scratch.data()));
-  // Check that we get the original data back.
-  ASSERT_EQ(0, memcmp(original.data(), scratch.data(), buffer_size));
+  key.SetCipherMode(AES_256_GCM);
+
+  // Even it has been set as GCM mode, it may fall back to other modes.
+  // Check if GCM mode is supported at runtime.
+  if (key.IsGcmMode()) {
+    GenerateRandomData(buffer.data(), buffer_size);
+    ASSERT_OK(key.Encrypt(buffer.data(), buffer_size, buffer.data()));
+
+    // tamper the data
+    ++buffer[0];
+    Status s = key.Decrypt(buffer.data(), buffer_size, buffer.data());
+    EXPECT_STR_CONTAINS(s.GetDetail(), "EVP_DecryptFinal");
+  }
 }
 
 /// Test basic integrity hash functionality.

http://git-wip-us.apache.org/repos/asf/impala/blob/9b68645f/be/src/util/openssl-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.cc b/be/src/util/openssl-util.cc
index 69dc676..ffb47eb 100644
--- a/be/src/util/openssl-util.cc
+++ b/be/src/util/openssl-util.cc
@@ -20,6 +20,7 @@
 #include <limits.h>
 #include <sstream>
 
+#include <glog/logging.h>
 #include <openssl/err.h>
 #include <openssl/evp.h>
 #include <openssl/rand.h>
@@ -30,6 +31,7 @@
 #include "gutil/strings/substitute.h"
 
 #include "common/names.h"
+#include "cpu-info.h"
 
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
@@ -107,19 +109,20 @@ void EncryptionKey::InitializeRandom() {
   }
   RAND_bytes(key_, sizeof(key_));
   RAND_bytes(iv_, sizeof(iv_));
+  memset(gcm_tag_, 0, sizeof(gcm_tag_));
   initialized_ = true;
 }
 
-Status EncryptionKey::Encrypt(const uint8_t* data, int64_t len, uint8_t* out) const {
+Status EncryptionKey::Encrypt(const uint8_t* data, int64_t len, uint8_t* out) {
   return EncryptInternal(true, data, len, out);
 }
 
-Status EncryptionKey::Decrypt(const uint8_t* data, int64_t len, uint8_t* out) const {
+Status EncryptionKey::Decrypt(const uint8_t* data, int64_t len, uint8_t* out) {
   return EncryptInternal(false, data, len, out);
 }
 
 Status EncryptionKey::EncryptInternal(
-    bool encrypt, const uint8_t* data, int64_t len, uint8_t* out) const {
+    bool encrypt, const uint8_t* data, int64_t len, uint8_t* out) {
   DCHECK(initialized_);
   DCHECK_GE(len, 0);
   // Create and initialize the context for encryption
@@ -127,6 +130,10 @@ Status EncryptionKey::EncryptInternal(
   EVP_CIPHER_CTX_init(&ctx);
   EVP_CIPHER_CTX_set_padding(&ctx, 0);
 
+  if (IsGcmMode()) {
+    EVP_CIPHER_CTX_ctrl(&ctx, EVP_CTRL_GCM_SET_IVLEN, AES_BLOCK_SIZE, NULL);
+  }
+
   // Start encryption/decryption.  We use a 256-bit AES key, and the cipher block mode
   // is either CTR or CFB(stream cipher), both of which support arbitrary length
   // ciphertexts - it doesn't have to be a multiple of 16 bytes. Additionally, CTR
@@ -157,6 +164,11 @@ Status EncryptionKey::EncryptInternal(
     offset += in_len;
   }
 
+  if (IsGcmMode() && !encrypt) {
+    // Set expected tag value
+    EVP_CIPHER_CTX_ctrl(&ctx, EVP_CTRL_GCM_SET_TAG, AES_BLOCK_SIZE, gcm_tag_);
+  }
+
   // Finalize encryption or decryption.
   int final_out_len;
   success = encrypt ? EVP_EncryptFinal_ex(&ctx, out + offset, &final_out_len) :
@@ -164,21 +176,93 @@ Status EncryptionKey::EncryptInternal(
   if (success != 1) {
     return OpenSSLErr(encrypt ? "EVP_EncryptFinal" : "EVP_DecryptFinal");
   }
-  // Again safe due to CTR/CFB with no padding
+
+  if (IsGcmMode() && encrypt) {
+    EVP_CIPHER_CTX_ctrl(&ctx, EVP_CTRL_GCM_GET_TAG, AES_BLOCK_SIZE, gcm_tag_);
+  }
+
+  // Again safe due to GCM/CTR/CFB with no padding
   DCHECK_EQ(final_out_len, 0);
   return Status::OK();
 }
 
+/// OpenSSL 1.0.1d
+#define OPENSSL_VERSION_1_0_1D 0x1000104fL
+
+/// If not defined at compile time, define them manually
+/// see: openssl/evp.h
+#ifndef EVP_CIPH_GCM_MODE
+#define EVP_CTRL_GCM_SET_IVLEN 0x9
+#define EVP_CTRL_GCM_GET_TAG 0x10
+#define EVP_CTRL_GCM_SET_TAG 0x11
+#endif
+
 extern "C" {
 ATTRIBUTE_WEAK
 const EVP_CIPHER* EVP_aes_256_ctr();
+
+ATTRIBUTE_WEAK
+const EVP_CIPHER* EVP_aes_256_gcm();
 }
 
 const EVP_CIPHER* EncryptionKey::GetCipher() const {
   // use weak symbol to avoid compiling error on OpenSSL 1.0.0 environment
-  if (mode_ == AES_256_CTR && EVP_aes_256_ctr) return EVP_aes_256_ctr();
+  if (mode_ == AES_256_CTR) return EVP_aes_256_ctr();
+  if (mode_ == AES_256_GCM) return EVP_aes_256_gcm();
 
-  // otherwise, fallback to CFB mode
   return EVP_aes_256_cfb();
 }
+
+void EncryptionKey::SetCipherMode(AES_CIPHER_MODE m) {
+  mode_ = m;
+
+  if (!IsModeSupported(m)) {
+    mode_ = GetSupportedDefaultMode();
+    LOG(WARNING) << Substitute("$0 is not supported, fall back to $1.",
+        ModeToString(m), ModeToString(mode_));
+  }
+}
+
+bool EncryptionKey::IsModeSupported(AES_CIPHER_MODE m) const {
+  switch (m) {
+    case AES_256_GCM:
+      // It becomes a bit tricky for GCM mode, because GCM mode is enabled since
+      // OpenSSL 1.0.1, but the tag validation only works since 1.0.1d. We have
+      // to make sure that OpenSSL version >= 1.0.1d for GCM. So we need
+      // SSLeay(). Note that SSLeay() may return the compiling version on
+      // certain platforms if it was built against an older version(see:
+      // IMPALA-6418). In this case, it will return false, and EncryptionKey
+      // will try to fall back to CTR mode, so it is not ideal but is OK to use
+      // SSLeay() for GCM mode here since in the worst case, we will be using
+      // AES_256_CTR in a system that supports AES_256_GCM.
+      return (CpuInfo::IsSupported(CpuInfo::PCLMULQDQ)
+          && SSLeay() >= OPENSSL_VERSION_1_0_1D && EVP_aes_256_gcm);
+
+    case AES_256_CTR:
+      // If TLS1.2 is supported, then we're on a verison of OpenSSL that
+      // supports AES-256-CTR.
+      return (MaxSupportedTlsVersion() >= TLS1_2_VERSION && EVP_aes_256_ctr);
+
+    case AES_256_CFB:
+      return true;
+
+    default:
+      return false;
+  }
+}
+
+AES_CIPHER_MODE EncryptionKey::GetSupportedDefaultMode() const {
+  if (IsModeSupported(AES_256_GCM)) return AES_256_GCM;
+  if (IsModeSupported(AES_256_CTR)) return AES_256_CTR;
+  return AES_256_CFB;
+}
+
+const string EncryptionKey::ModeToString(AES_CIPHER_MODE m) const {
+  switch(m) {
+    case AES_256_GCM: return "AES-GCM";
+    case AES_256_CTR: return "AES-CTR";
+    case AES_256_CFB: return "AES-CFB";
+  }
+  return "Unknown mode";
+}
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/9b68645f/be/src/util/openssl-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/openssl-util.h b/be/src/util/openssl-util.h
index 7b1b28e..ef53425 100644
--- a/be/src/util/openssl-util.h
+++ b/be/src/util/openssl-util.h
@@ -60,9 +60,9 @@ bool IsExternalTlsConfigured();
 void SeedOpenSSLRNG();
 
 enum AES_CIPHER_MODE {
-  AES_256_CTR,
   AES_256_CFB,
-  AES_256_GCM // not supported now.
+  AES_256_CTR,
+  AES_256_GCM
 };
 
 /// The hash of a data buffer used for checking integrity. A SHA256 hash is used
@@ -83,43 +83,56 @@ class IntegrityHash {
 /// The key and initialization vector (IV) required to encrypt and decrypt a buffer of
 /// data. This should be regenerated for each buffer of data.
 ///
-/// We use AES with a 256-bit key and CTR/CFB cipher block mode, which gives us a stream
-/// cipher that can support arbitrary-length ciphertexts. If OpenSSL version at runtime
-/// is 1.0.1 or above, CTR mode is used, otherwise CFB mode is used. The IV is used as
+/// We use AES with a 256-bit key and GCM/CTR/CFB cipher block mode, which gives us a
+/// stream cipher that can support arbitrary-length ciphertexts. The mode is chosen
+/// depends on the OpenSSL version & the hardware support at runtime. The IV is used as
 /// an input to the cipher as the "block to supply before the first block of plaintext".
 /// This is required because all ciphers (except the weak ECB) are built such that each
 /// block depends on the output from the previous block. Since the first block doesn't
 /// have a previous block, we supply this IV. Think of it  as starting off the chain of
 /// encryption.
+///
+/// Notes for GCM:
+/// (1) GCM mode was supported since OpenSSL 1.0.1, however the tag verification
+/// in decryption was only supported since OpenSSL 1.0.1d.
+/// (2) The plaintext and the Additional Authenticated Data(AAD) are the two
+/// categories of data that GCM protects. GCM protects the authenticity of the
+/// plaintext and the AAD, and GCM also protects the confidentiality of the
+/// plaintext. The AAD itself is not required or won't change the security.
+/// In our case(Spill to Disk), we just ignore the AAD.
+
 class EncryptionKey {
  public:
-  EncryptionKey() : initialized_(false) {
-    // If TLS1.2 is supported, then we're on a verison of OpenSSL that supports
-    // AES-256-CTR.
-    mode_ = MaxSupportedTlsVersion() < TLS1_2_VERSION ? AES_256_CFB : AES_256_CTR;
-  }
-
-  /// Initialize a key for temporary use with randomly generated data. Reinitializes with
-  /// new random values if the key was already initialized. We use AES-CTR/AES-CFB mode
-  /// so key/IV pairs should not be reused. This function automatically reseeds the RNG
-  /// periodically, so callers do not need to do it.
+  EncryptionKey() : initialized_(false) { mode_ = GetSupportedDefaultMode(); }
+
+  /// Initializes a key for temporary use with randomly generated data, and clears the
+  /// tag for GCM mode. Reinitializes with new random values if the key was already
+  /// initialized. We use AES-GCM/AES-CTR/AES-CFB mode so key/IV pairs should not be
+  /// reused. This function automatically reseeds the RNG periodically, so callers do
+  /// not need to do it.
   void InitializeRandom();
 
   /// Encrypts a buffer of input data 'data' of length 'len' into an output buffer 'out'.
   /// Exactly 'len' bytes will be written to 'out'. This key must be initialized before
   /// calling. Operates in-place if 'in' == 'out', otherwise the buffers must not overlap.
-  Status Encrypt(const uint8_t* data, int64_t len, uint8_t* out) const WARN_UNUSED_RESULT;
+  /// For GCM mode, the hash tag will be kept inside(gcm_tag_ variable).
+  Status Encrypt(const uint8_t* data, int64_t len, uint8_t* out) WARN_UNUSED_RESULT;
 
   /// Decrypts a buffer of input data 'data' of length 'len' that was encrypted with this
   /// key into an output buffer 'out'. Exactly 'len' bytes will be written to 'out'.
   /// This key must be initialized before calling. Operates in-place if 'in' == 'out',
-  /// otherwise the buffers must not overlap.
-  Status Decrypt(const uint8_t* data, int64_t len, uint8_t* out) const WARN_UNUSED_RESULT;
+  /// otherwise the buffers must not overlap. For GCM mode, the hash tag, which is
+  /// computed during encryption, will be used for intgerity verification.
+  Status Decrypt(const uint8_t* data, int64_t len, uint8_t* out) WARN_UNUSED_RESULT;
 
   /// Specify a cipher mode. Currently used only for testing but maybe in future we
   /// can provide a configuration option for the end user who can choose a preferred
   /// mode(GCM, CTR, CFB...) based on their software/hardware environment.
-  void SetCipherMode(AES_CIPHER_MODE m) { mode_ = m; }
+  /// If not supported, fall back to the supported mode at runtime.
+  void SetCipherMode(AES_CIPHER_MODE m);
+
+  /// If is GCM mode at runtime
+  bool IsGcmMode() const { return mode_ == AES_256_GCM; }
 
  private:
   /// Helper method that encrypts/decrypts if 'encrypt' is true/false respectively.
@@ -128,13 +141,25 @@ class EncryptionKey {
   /// This key must be initialized before calling. Operates in-place if 'in' == 'out',
   /// otherwise the buffers must not overlap.
   Status EncryptInternal(bool encrypt, const uint8_t* data, int64_t len,
-      uint8_t* out) const WARN_UNUSED_RESULT;
+      uint8_t* out) WARN_UNUSED_RESULT;
+
+  /// Check if mode m is supported at runtime
+  bool IsModeSupported(AES_CIPHER_MODE m) const;
+
+  /// Returns the a default mode which is supported at runtime. If GCM mode
+  /// is supported, return AES_256_GCM as the default. If GCM is not supported,
+  /// but CTR is still supported, return AES_256_CTR. When both GCM and
+  /// CTR modes are not supported, return AES_256_CFB.
+  AES_CIPHER_MODE GetSupportedDefaultMode() const;
+
+  /// Converts mode type to string.
+  const string ModeToString(AES_CIPHER_MODE m) const;
 
   /// Track whether this key has been initialized, to avoid accidentally using
   /// uninitialized keys.
   bool initialized_;
 
-  /// return a EVP_CIPHER according to cipher mode at runtime
+  /// Returns a EVP_CIPHER according to cipher mode at runtime
   const EVP_CIPHER* GetCipher() const;
 
   /// An AES 256-bit key.
@@ -143,6 +168,9 @@ class EncryptionKey {
   /// An initialization vector to feed as the first block to AES.
   uint8_t iv_[AES_BLOCK_SIZE];
 
+  /// Tag for GCM mode
+  uint8_t gcm_tag_[AES_BLOCK_SIZE];
+
   /// Cipher Mode
   AES_CIPHER_MODE mode_;
 };


[2/3] impala git commit: IMPALA-6448: Re-enable kerberized testing with KRPC

Posted by sa...@apache.org.
IMPALA-6448: Re-enable kerberized testing with KRPC

For the patch for IMPALA-5054, we realized that we needed to make
the kudu::rpc::Messenger configurable. A patch was done on the Kudu
side which is tracked by KUDU-2228. As part of that patch, one of
the design decisions taken was to only allow kerberos either on or
off for the entirety of the process life. This means that we cannot
switch kerberos on and off in the same process any more with KRPC.
This behavior can be found in SaslInit() in kudu/rpc/sasl_common.cc
as SaslInit() which is called once per process will hard code some
configuration which cannot be toggled.

This affected our kerberized rpc-mgr-tests. This patch splits out
the kerberized part of rpc-mgr-test into rpc-mgr-kerberized-test.

It also puts the common code between both the files into
rpc-mgr-test-base.h

Change-Id: I6412978316de90875c98f8fbe51c8d215c227b18
Reviewed-on: http://gerrit.cloudera.org:8080/9164
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9ac9f7e3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9ac9f7e3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9ac9f7e3

Branch: refs/heads/master
Commit: 9ac9f7e3f8f327a3cade75548a85106190c8842c
Parents: 30c0375
Author: Sailesh Mukil <sa...@apache.org>
Authored: Tue Jan 30 21:35:28 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 5 22:49:07 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/CMakeLists.txt             |   7 +-
 be/src/rpc/rpc-mgr-kerberized-test.cc |  89 ++++++++
 be/src/rpc/rpc-mgr-test-base.h        | 269 +++++++++++++++++++++++
 be/src/rpc/rpc-mgr-test.cc            | 333 ++---------------------------
 4 files changed, 381 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 7beb80d..e4d96e2 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -50,7 +50,12 @@ ADD_BE_TEST(rpc-mgr-test)
 add_dependencies(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test security-test-for-impala)
-target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
+
+ADD_BE_TEST(rpc-mgr-kerberized-test)
+add_dependencies(rpc-mgr-kerberized-test rpc_test_proto)
+target_link_libraries(rpc-mgr-kerberized-test rpc_test_proto)
+target_link_libraries(rpc-mgr-kerberized-test security-test-for-impala)
+target_link_libraries(rpc-mgr-kerberized-test ${KRB5_REALM_OVERRIDE})
 
 add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
 add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)

http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
new file mode 100644
index 0000000..57ed3eb
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr-test-base.h"
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+
+namespace impala {
+
+static int kdc_port = GetServerPort();
+
+class RpcMgrKerberizedTest :
+    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    string spn = Substitute("impala-test/$0", ip);
+
+    kdc_wrapper_.reset(new MiniKdcWrapper(
+        std::move(spn), "KRBTEST.COM", "24h", "7d", kdc_port));
+    DCHECK(kdc_wrapper_.get() != nullptr);
+
+    ASSERT_OK(kdc_wrapper_->SetupAndStartMiniKDC(GetParam()));
+    ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
+
+    RpcMgrTestBase::SetUp();
+  }
+
+  virtual void TearDown() {
+    ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
+    RpcMgrTestBase::TearDown();
+  }
+
+ private:
+  boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
+};
+
+INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
+                        RpcMgrKerberizedTest,
+                        ::testing::Values(USE_KUDU_KERBEROS,
+                                          USE_IMPALA_KERBEROS));
+
+TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+  // TODO: We're starting a seperate RpcMgr here instead of configuring
+  // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
+  // new gtest params to turn on TLS which needs to be a coordinated change across
+  // rpc-mgr-test and thrift-server-test.
+  RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
+  TNetworkAddress tls_krpc_address;
+  IpAddr ip;
+  ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+
+  int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
+  tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
+
+  // Enable TLS.
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
+  ASSERT_OK(tls_rpc_mgr.Init());
+
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  tls_rpc_mgr.Shutdown();
+}
+
+} // namespace impala
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
+
+  // Fill in the path of the current binary for use by the tests.
+  CURRENT_EXECUTABLE_PATH = argv[0];
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
new file mode 100644
index 0000000..43b6d83
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "rpc/auth-provider.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/gtest-util.h"
+#include "testutil/mini-kdc-wrapper.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/counting-barrier.h"
+#include "util/network-util.h"
+#include "util/openssl-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+
+#include "common/names.h"
+
+using kudu::rpc::ServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+
+// The path of the current executable file that is required for passing into the SASL
+// library as the 'application name'.
+static string CURRENT_EXECUTABLE_PATH;
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
+
+int GetServerPort() {
+  int port = FindUnusedEphemeralPort(nullptr);
+  EXPECT_FALSE(port == -1);
+  return port;
+}
+
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+/// Use this class to set the appropriate required TLS flags for the duration of the
+/// lifetime of the object.
+/// It is assumed that the flags always hold empty values by default.
+class ScopedSetTlsFlags {
+ public:
+  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
+      const string& pkey_passwd = "", const string& ciphers = "") {
+    FLAGS_ssl_server_certificate = cert;
+    FLAGS_ssl_private_key = pkey;
+    FLAGS_ssl_client_ca_certificate = ca_cert;
+    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
+    FLAGS_ssl_cipher_list = ciphers;
+  }
+
+  ~ScopedSetTlsFlags() {
+    FLAGS_ssl_server_certificate = "";
+    FLAGS_ssl_private_key = "";
+    FLAGS_ssl_client_ca_certificate = "";
+    FLAGS_ssl_private_key_password_cmd = "";
+    FLAGS_ssl_cipher_list = "";
+  }
+};
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
+#define PAYLOAD_SIZE (4096)
+
+template <class T> class RpcMgrTestBase : public T {
+ public:
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
+  MemTracker* service_tracker() { return &service_tracker_; }
+
+ protected:
+  TNetworkAddress krpc_address_;
+  RpcMgr rpc_mgr_;
+
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    ASSERT_OK(rpc_mgr_.Init());
+  }
+
+  virtual void TearDown() {
+    rpc_mgr_.Shutdown();
+  }
+
+ private:
+  int32_t payload_[PAYLOAD_SIZE];
+  MemTracker service_tracker_;
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
+  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
+      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
+
+  virtual void Ping(
+      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
+    response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
+    cb_(context);
+  }
+
+ private:
+  MemTracker* mem_tracker_;
+  ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
+  }
+
+  // The request comes with an int 'pattern' and a payload of int array sent with
+  // sidecar. Scan the array to make sure every element matches 'pattern'.
+  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
+      RpcContext* context) {
+    int32_t pattern = request->pattern();
+    Slice payload;
+    ASSERT_OK(
+        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
+    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+      int32_t val = v[i];
+      if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release the memory.
+        mem_tracker_->Release(context->GetTransferSize());
+        context->RespondFailure(kudu::Status::Corruption(
+            Substitute("Expecting $1; Found $2", pattern, val)));
+        return;
+      }
+    }
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
+    context->RespondSuccess();
+  }
+
+ private:
+  MemTracker* mem_tracker_;
+
+};
+
+template <class T>
+Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
+    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  MemTracker* mem_tracker = test_base->service_tracker();
+  // Test that a service can be started, and will respond to requests.
+  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
+
+  // Test that a second service, that verifies the RPC payload is not corrupted,
+  // can be started.
+  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
+
+  unique_ptr<PingServiceProxy> ping_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
+
+  RpcController controller;
+  srand(0);
+  // Randomly invoke either services to make sure a RpcMgr can host multiple
+  // services at the same time.
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    if (random() % 2 == 0) {
+      PingRequestPB request;
+      PingResponsePB response;
+      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+          "unable to execute Ping() RPC.");
+      if (response.int_response() != 42) {
+          return Status(Substitute(
+              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                  response.int_response()));
+      }
+    } else {
+      ScanMemRequestPB request;
+      ScanMemResponsePB response;
+      test_base->SetupScanMemRequest(&request, &controller);
+      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
+          "unable to execute ScanMem() RPC.");
+    }
+  }
+
+  return Status::OK();
+}
+
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/9ac9f7e3/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index c525148..4c4b100 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -15,124 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr.inline.h"
-
-#include "common/init.h"
-#include "exec/kudu-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/rpc_sidecar.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "rpc/auth-provider.h"
-#include "runtime/mem-tracker.h"
-#include "testutil/gtest-util.h"
-#include "testutil/mini-kdc-wrapper.h"
-#include "testutil/scoped-flag-setter.h"
-#include "util/counting-barrier.h"
-#include "util/network-util.h"
-#include "util/openssl-util.h"
-#include "util/test-info.h"
-
-#include "gen-cpp/rpc_test.proxy.h"
-#include "gen-cpp/rpc_test.service.h"
-
-#include "common/names.h"
-
-using kudu::rpc::ErrorStatusPB;
+#include "rpc/rpc-mgr-test-base.h"
+
 using kudu::rpc::ServiceIf;
 using kudu::rpc::RpcController;
 using kudu::rpc::RpcContext;
-using kudu::rpc::RpcSidecar;
 using kudu::MonoDelta;
-using kudu::Slice;
-
-using namespace std;
 
 DECLARE_int32(num_reactor_threads);
 DECLARE_int32(num_acceptor_threads);
 DECLARE_string(hostname);
 
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-
-// The path of the current executable file that is required for passing into the SASL
-// library as the 'application name'.
-static string CURRENT_EXECUTABLE_PATH;
-
 namespace impala {
 
-static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
-
-int GetServerPort() {
-  int port = FindUnusedEphemeralPort(nullptr);
-  EXPECT_FALSE(port == -1);
-  return port;
-}
-
-static int kdc_port = GetServerPort();
-
-const static string IMPALA_HOME(getenv("IMPALA_HOME"));
-const string& SERVER_CERT =
-    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
-const string& PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
-const string& BAD_SERVER_CERT =
-    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
-const string& BAD_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
-const string& PASSWORD_PROTECTED_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
-
-// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
-// support.
-const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
-const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
-
-#define PAYLOAD_SIZE (4096)
-
-template <class T> class RpcMgrTestBase : public T {
- public:
-  // Utility function to initialize the parameter for ScanMem RPC.
-  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
-  // to 'controller'. Also sets up 'request' with the random value and index of the
-  // sidecar.
-  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
-    int32_t pattern = random();
-    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
-    int idx;
-    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
-    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
-    request->set_pattern(pattern);
-    request->set_sidecar_idx(idx);
-  }
-
-  MemTracker* service_tracker() { return &service_tracker_; }
-
- protected:
-  TNetworkAddress krpc_address_;
-  RpcMgr rpc_mgr_;
-
-  virtual void SetUp() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
-    ASSERT_OK(rpc_mgr_.Init());
-  }
-
-  virtual void TearDown() {
-    rpc_mgr_.Shutdown();
-  }
-
- private:
-  int32_t payload_[PAYLOAD_SIZE];
-  MemTracker service_tracker_;
-};
-
 // For tests that do not require kerberized testing, we use RpcTest.
 class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
   virtual void SetUp() {
@@ -144,157 +39,7 @@ class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
   }
 };
 
-class RpcMgrKerberizedTest :
-    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-  virtual void SetUp() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    string spn = Substitute("impala-test/$0", ip);
-
-    kdc_wrapper_.reset(new MiniKdcWrapper(
-        std::move(spn), "KRBTEST.COM", "24h", "7d", kdc_port));
-    DCHECK(kdc_wrapper_.get() != nullptr);
-
-    ASSERT_OK(kdc_wrapper_->SetupAndStartMiniKDC(GetParam()));
-    ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
-
-    RpcMgrTestBase::SetUp();
-  }
-
-  virtual void TearDown() {
-    ASSERT_OK(kdc_wrapper_->TearDownMiniKDC(GetParam()));
-    RpcMgrTestBase::TearDown();
-  }
-
- private:
-  boost::scoped_ptr<MiniKdcWrapper> kdc_wrapper_;
-};
-
-typedef std::function<void(RpcContext*)> ServiceCB;
-
-class PingServiceImpl : public PingServiceIf {
- public:
-  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
-  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
-      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
-
-  virtual void Ping(
-      const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
-    response->set_int_response(42);
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_->Release(context->GetTransferSize());
-    cb_(context);
-  }
-
- private:
-  MemTracker* mem_tracker_;
-  ServiceCB cb_;
-};
-
-class ScanMemServiceImpl : public ScanMemServiceIf {
- public:
-  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
-  }
-
-  // The request comes with an int 'pattern' and a payload of int array sent with
-  // sidecar. Scan the array to make sure every element matches 'pattern'.
-  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
-      RpcContext* context) {
-    int32_t pattern = request->pattern();
-    Slice payload;
-    ASSERT_OK(
-        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
-    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
-
-    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
-    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
-      int32_t val = v[i];
-      if (val != pattern) {
-        // Incoming requests will already be tracked and we need to release the memory.
-        mem_tracker_->Release(context->GetTransferSize());
-        context->RespondFailure(kudu::Status::Corruption(
-            Substitute("Expecting $1; Found $2", pattern, val)));
-        return;
-      }
-    }
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_->Release(context->GetTransferSize());
-    context->RespondSuccess();
-  }
-
- private:
-  MemTracker* mem_tracker_;
-};
-
-// TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448.
-// Re-enable after fixing.
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
-                        RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_OFF));
-
-template <class T>
-Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
-    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  MemTracker* mem_tracker = test_base->service_tracker();
-  // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
-
-  // Test that a second service, that verifies the RPC payload is not corrupted,
-  // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
-
-  FLAGS_num_acceptor_threads = 2;
-  FLAGS_num_reactor_threads = 10;
-  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
-
-  unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
-
-  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
-
-  RpcController controller;
-  srand(0);
-  // Randomly invoke either services to make sure a RpcMgr can host multiple
-  // services at the same time.
-  for (int i = 0; i < 100; ++i) {
-    controller.Reset();
-    if (random() % 2 == 0) {
-      PingRequestPB request;
-      PingResponsePB response;
-      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
-          "unable to execute Ping() RPC.");
-      if (response.int_response() != 42) {
-          return Status(Substitute(
-              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
-                  response.int_response()));
-      }
-    } else {
-      ScanMemRequestPB request;
-      ScanMemResponsePB response;
-      test_base->SetupScanMemRequest(&request, &controller);
-      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
-          "unable to execute ScanMem() RPC.");
-    }
-  }
-
-  return Status::OK();
-}
-
-
-TEST_F(RpcMgrTest, MultipleServices) {
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
-}
-
-TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+TEST_F(RpcMgrTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
   // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
   // new gtest params to turn on TLS which needs to be a coordinated change across
@@ -307,28 +52,20 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  // Enable TLS.
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
   ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
+TEST_F(RpcMgrTest, MultipleServices) {
+  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+}
+
 // Test with a misconfigured TLS certificate and verify that an error is thrown.
 TEST_F(RpcMgrTest, BadCertificateTls) {
-
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, "unknown");
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, "unknown");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -344,16 +81,8 @@ TEST_F(RpcMgrTest, BadCertificateTls) {
 
 // Test with a bad password command for the password protected private key.
 TEST_F(RpcMgrTest, BadPasswordTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto password_cmd =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key_password_cmd, "echo badpassword");
+  ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
+      "echo badpassword");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -369,16 +98,8 @@ TEST_F(RpcMgrTest, BadPasswordTls) {
 
 // Test with a correct password command for the password protected private key.
 TEST_F(RpcMgrTest, CorrectPasswordTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key, PASSWORD_PROTECTED_PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto password_cmd =
-      ScopedFlagSetter<string>::Make(
-          &FLAGS_ssl_private_key_password_cmd, "echo password");
+  ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
+      "echo password");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -395,14 +116,7 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) {
 
 // Test with a bad TLS cipher and verify that an error is thrown.
 TEST_F(RpcMgrTest, BadCiphersTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, "not_a_cipher");
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", "not_a_cipher");
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -418,14 +132,8 @@ TEST_F(RpcMgrTest, BadCiphersTls) {
 
 // Test with a valid TLS cipher.
 TEST_F(RpcMgrTest, ValidCiphersTls) {
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, TLS1_0_COMPATIBLE_CIPHER);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "",
+      TLS1_0_COMPATIBLE_CIPHER);
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;
@@ -444,14 +152,7 @@ TEST_F(RpcMgrTest, ValidCiphersTls) {
 TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER,
       TLS1_0_COMPATIBLE_CIPHER_2);
-  auto cert_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_server_certificate, SERVER_CERT);
-  auto pkey_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_private_key, PRIVATE_KEY);
-  auto ca_flag =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_client_ca_certificate, SERVER_CERT);
-  auto cipher =
-      ScopedFlagSetter<string>::Make(&FLAGS_ssl_cipher_list, cipher_list);
+  ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", cipher_list);
 
   RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
   TNetworkAddress tls_krpc_address;