You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/08/30 06:18:21 UTC

[7/7] impala git commit: Add missing authorization in KRPC

Add missing authorization in KRPC

In 2.12.0, Impala adopted Kudu RPC library for certain backened services
(TransmitData(), EndDataStream()). While the implementation uses Kerberos
for authenticating users connecting to the backend services, there is no
authorization implemented. This is a regression from the Thrift based
implementation because it registered a SASL callback (SaslAuthorizeInternal)
to be invoked during the connection negotiation. With this regression,
an unauthorized but authenticated user may invoke RPC calls to Impala backend
services.

This change fixes the issue above by overriding the default authorization method
for the DataStreamService. The authorization method will only let authenticated
principal which matches FLAGS_principal / FLAGS_be_principal to access the service.
Also added a new startup flag --krb5_ccname to allow users to customize the locations
of the Kerberos credentials cache.

Testing done:
1. Added a new test case in rpc-mgr-kerberized-test.cc to confirm an unauthorized
user is not allowed to access the service.
2. Ran some queries in a Kerberos enabled cluster to make sure there is no error.
3. Exhaustive builds.

Thanks to Todd Lipcon for pointing out the problem and his guidance on the fix.

Change-Id: I2f82dee5e721f2ed23e75fd91abbc6ab7addd4c5
Reviewed-on: http://gerrit.cloudera.org:8080/11331
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5c541b96
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5c541b96
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5c541b96

Branch: refs/heads/master
Commit: 5c541b960491ba91533712144599fb3b6d99521d
Parents: b97e5ba
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Aug 23 00:33:16 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 30 04:06:09 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc             |   2 +
 be/src/rpc/authentication.cc              |  16 +-
 be/src/rpc/rpc-mgr-kerberized-test.cc     | 127 +++++++++--
 be/src/rpc/rpc-mgr-test-base.h            | 284 -----------------------
 be/src/rpc/rpc-mgr-test.cc                |  39 ++--
 be/src/rpc/rpc-mgr-test.h                 | 300 +++++++++++++++++++++++++
 be/src/rpc/rpc-mgr.cc                     |  33 ++-
 be/src/rpc/rpc-mgr.h                      |  10 +
 be/src/rpc/thrift-server-test.cc          |  19 +-
 be/src/runtime/data-stream-test.cc        |   5 +
 be/src/service/data-stream-service.cc     |   6 +
 be/src/service/data-stream-service.h      |   6 +
 be/src/testutil/mini-kdc-wrapper.cc       |  32 ++-
 be/src/testutil/mini-kdc-wrapper.h        |  44 ++--
 be/src/util/auth-util.cc                  |  37 ++-
 be/src/util/auth-util.h                   |   6 +-
 bin/rat_exclude_files.txt                 |   1 +
 common/protobuf/data_stream_service.proto |   5 +
 common/protobuf/kudu                      |   1 +
 common/protobuf/rpc_test.proto            |   6 +
 20 files changed, 588 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 5c5dc03..1267072 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -47,6 +47,8 @@ DEFINE_string(principal, "", "Kerberos principal. If set, both client and backen
 DEFINE_string(be_principal, "", "Kerberos principal for backend network connections only,"
     "overriding --principal if set. Must not be set if --principal is not set.");
 DEFINE_string(keytab_file, "", "Absolute path to Kerberos keytab file");
+DEFINE_string(krb5_ccname, "/tmp/krb5cc_impala_internal", "Absolute path to the file "
+    "based credentials cache that we pass to the KRB5CCNAME environment variable.");
 DEFINE_string(krb5_conf, "", "Absolute path to Kerberos krb5.conf if in a non-standard "
     "location. Does not normally need to be set.");
 DEFINE_string(krb5_debug_file, "", "Turn on Kerberos debugging and output to this file");

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index c22ab88..0b5b5d9 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -62,12 +62,13 @@ using boost::algorithm::trim;
 using boost::mt19937;
 using boost::uniform_int;
 using namespace apache::thrift;
-using namespace boost::filesystem;   // for is_regular()
+using namespace boost::filesystem;   // for is_regular(), is_absolute()
 using namespace strings;
 
 DECLARE_string(keytab_file);
 DECLARE_string(principal);
 DECLARE_string(be_principal);
+DECLARE_string(krb5_ccname);
 DECLARE_string(krb5_conf);
 DECLARE_string(krb5_debug_file);
 
@@ -123,10 +124,6 @@ static vector<sasl_callback_t> LDAP_EXT_CALLBACKS;  // External LDAP connections
 // the same 'appname' or InitAuth() will fail.
 static string APP_NAME;
 
-// Path to the file based credential cache that we pass to the KRB5CCNAME environment
-// variable.
-static const string KRB5CCNAME_PATH = "/tmp/krb5cc_impala_internal";
-
 // Constants for the two Sasl mechanisms we support
 static const string KERBEROS_MECHANISM = "GSSAPI";
 static const string PLAIN_MECHANISM = "PLAIN";
@@ -732,7 +729,12 @@ Status AuthManager::InitKerberosEnv() {
   // is normally fine, but if you're not running impala daemons as user
   // 'impala', the kinit we perform is going to blow away credentials for the
   // current user.  Not setting this isn't technically fatal, so ignore errors.
-  (void) setenv("KRB5CCNAME", "/tmp/krb5cc_impala_internal", 1);
+  const path krb5_ccname_path(FLAGS_krb5_ccname);
+  if (!krb5_ccname_path.is_absolute()) {
+    return Status(Substitute("Bad --krb5_ccname value: $0 is not an absolute file path",
+        FLAGS_krb5_ccname));
+  }
+  discard_result(setenv("KRB5CCNAME", FLAGS_krb5_ccname.c_str(), 1));
 
   // If an alternate krb5_conf location is supplied, set both KRB5_CONFIG and
   // JAVA_TOOL_OPTIONS in the environment.
@@ -785,7 +787,7 @@ Status SaslAuthProvider::Start() {
     // Starts a thread that periodically does a 'kinit'. The thread lives as long as the
     // process does.
     KUDU_RETURN_IF_ERROR(kudu::security::InitKerberosForServer(principal_, keytab_file_,
-        KRB5CCNAME_PATH, false), "Could not init kerberos");
+        FLAGS_krb5_ccname, false), "Could not init kerberos");
     LOG(INFO) << "Kerberos ticket granted to " << principal_;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-kerberized-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index c6b95c8..0121787 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -15,11 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr-test-base.h"
+#include "rpc/rpc-mgr-test.h"
+
+#include "exec/kudu-util.h"
+#include "rpc/auth-provider.h"
 #include "service/fe-support.h"
+#include "testutil/mini-kdc-wrapper.h"
 
 DECLARE_string(be_principal);
 DECLARE_string(hostname);
+DECLARE_string(keytab_file);
+DECLARE_string(krb5_ccname);
 DECLARE_string(principal);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_server_certificate);
@@ -27,32 +33,33 @@ DECLARE_string(ssl_private_key);
 
 // The principal name and the realm used for creating the mini-KDC.
 // To be initialized at main().
-static string kdc_principal;
-static string kdc_realm;
+static string kdc_ccname;
+static string principal;
+static string principal_kt_path;
+static string realm;
 
 namespace impala {
 
-class RpcMgrKerberizedTest :
-    public RpcMgrTestBase<testing::TestWithParam<KerberosSwitch> > {
-
+class RpcMgrKerberizedTest : public RpcMgrTest {
   virtual void SetUp() override {
-    FLAGS_principal = "dummy-service/host@realm";
-    FLAGS_be_principal = strings::Substitute("$0@$1", kdc_principal, kdc_realm);
+    FLAGS_principal = "dummy/host@realm";
+    FLAGS_be_principal = strings::Substitute("$0@$1", principal, realm);
+    FLAGS_keytab_file = principal_kt_path;
+    FLAGS_krb5_ccname = "/tmp/krb5cc_impala_internal";
     ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
-    RpcMgrTestBase::SetUp();
+    RpcMgrTest::SetUp();
   }
 
   virtual void TearDown() override {
     FLAGS_principal.clear();
     FLAGS_be_principal.clear();
+    FLAGS_keytab_file.clear();
+    FLAGS_krb5_ccname.clear();
+    RpcMgrTest::TearDown();
   }
 };
 
-INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
-                        RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_ON));
-
-TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
+TEST_F(RpcMgrKerberizedTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
   // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
   // new gtest params to turn on TLS which needs to be a coordinated change across
@@ -69,10 +76,78 @@ TEST_P(RpcMgrKerberizedTest, MultipleServicesTls) {
   ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
+// This test aims to exercise the authorization function in RpcMgr by accessing
+// services with a principal different from FLAGS_be_principal.
+TEST_F(RpcMgrKerberizedTest, AuthorizationFail) {
+  GeneratedServiceIf* ping_impl =
+      TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_));
+  GeneratedServiceIf* scan_mem_impl =
+      TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_));
+  const int num_service_threads = 10;
+  const int queue_size = 10;
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+
+  // Switch over to a credentials cache which only contains the dummy credential.
+  // Kinit done in InitAuth() uses a different credentials cache.
+  DCHECK_NE(FLAGS_krb5_ccname, kdc_ccname);
+  discard_result(setenv("KRB5CCNAME", kdc_ccname.c_str(), 1));
+
+  RpcController controller;
+  Status rpc_status;
+
+  // ScanMemService's authorization function always returns true so we should be able
+  // to access with dummy credentials.
+  unique_ptr<ScanMemServiceProxy> scan_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(
+      krpc_address_, FLAGS_hostname, &scan_proxy));
+  ScanMemRequestPB scan_request;
+  ScanMemResponsePB scan_response;
+  SetupScanMemRequest(&scan_request, &controller);
+  controller.Reset();
+  rpc_status =
+      FromKuduStatus(scan_proxy->ScanMem(scan_request, &scan_response, &controller));
+  EXPECT_TRUE(rpc_status.ok());
+
+  // Fail to access PingService as it's expecting FLAGS_be_principal as principal name.
+  unique_ptr<PingServiceProxy> ping_proxy;
+  ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(
+      krpc_address_, FLAGS_hostname, &ping_proxy));
+  PingRequestPB ping_request;
+  PingResponsePB ping_response;
+  controller.Reset();
+  rpc_status =
+      FromKuduStatus(ping_proxy->Ping(ping_request, &ping_response, &controller));
+  EXPECT_TRUE(!rpc_status.ok());
+  const string& err_string =
+      "Not authorized: {username='dummy', principal='dummy/host@KRBTEST.COM'}";
+  EXPECT_NE(rpc_status.GetDetail().find(err_string), string::npos);
+}
+
+// Test cases in which bad Kerberos credentials cache path is specified.
+TEST_F(RpcMgrKerberizedTest, BadCredentialsCachePath) {
+  FLAGS_krb5_ccname = "MEMORY:foo";
+  Status status = InitAuth(CURRENT_EXECUTABLE_PATH);
+  ASSERT_TRUE(!status.ok());
+  EXPECT_EQ(status.GetDetail(),
+      "Bad --krb5_ccname value: MEMORY:foo is not an absolute file path\n");
+
+  FLAGS_krb5_ccname = "~/foo";
+  status = InitAuth(CURRENT_EXECUTABLE_PATH);
+  ASSERT_TRUE(!status.ok());
+  EXPECT_EQ(status.GetDetail(),
+      "Bad --krb5_ccname value: ~/foo is not an absolute file path\n");
+}
+
 } // namespace impala
 
 using impala::Status;
@@ -86,15 +161,29 @@ int main(int argc, char** argv) {
   impala::IpAddr ip;
   impala::Status status = impala::HostnameToIpAddr(FLAGS_hostname, &ip);
   DCHECK(status.ok());
-  kdc_principal = Substitute("impala-test/$0", FLAGS_hostname);
-  kdc_realm = "KRBTEST.COM";
+  principal = Substitute("impala-test/$0", FLAGS_hostname);
+  realm = "KRBTEST.COM";
 
   int port = impala::FindUnusedEphemeralPort();
   std::unique_ptr<impala::MiniKdcWrapper> kdc;
-  status = impala::MiniKdcWrapper::SetupAndStartMiniKDC(
-      kdc_principal, kdc_realm, "24h", "7d", port, &kdc);
+  status = impala::MiniKdcWrapper::SetupAndStartMiniKDC(realm, "24h", "7d", port, &kdc);
   DCHECK(status.ok());
 
+  // Create a valid service principal and the associated keytab used for this test.
+  status = kdc->CreateServiceKeytab(principal, &principal_kt_path);
+  DCHECK(status.ok());
+
+  // Create a dummy service principal which is not authorized to access PingService.
+  const string& dummy_principal = "dummy/host";
+  status = kdc->CreateUserPrincipal(dummy_principal);
+  DCHECK(status.ok());
+  status = kdc->Kinit(dummy_principal);
+  DCHECK(status.ok());
+
+  // Get "KRB5CCNAME" set up by mini-kdc. It's the credentials cache which contains
+  // the dummy service's key
+  kdc_ccname = kdc->GetKrb5CCname();
+
   // Fill in the path of the current binary for use by the tests.
   CURRENT_EXECUTABLE_PATH = argv[0];
   int retval = RUN_ALL_TESTS();

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
deleted file mode 100644
index 258bd4b..0000000
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ /dev/null
@@ -1,284 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "rpc/rpc-mgr.inline.h"
-
-#include "common/init.h"
-#include "exec/kudu-util.h"
-#include "kudu/rpc/rpc_context.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/rpc_sidecar.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "rpc/auth-provider.h"
-#include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
-#include "testutil/gtest-util.h"
-#include "testutil/mini-kdc-wrapper.h"
-#include "testutil/scoped-flag-setter.h"
-#include "util/counting-barrier.h"
-#include "util/network-util.h"
-#include "util/openssl-util.h"
-#include "util/test-info.h"
-
-#include "gen-cpp/rpc_test.proxy.h"
-#include "gen-cpp/rpc_test.service.h"
-#include "gen-cpp/rpc_test.pb.h"
-
-#include "common/names.h"
-
-using kudu::rpc::GeneratedServiceIf;
-using kudu::rpc::RpcController;
-using kudu::rpc::RpcContext;
-using kudu::rpc::RpcSidecar;
-using kudu::Slice;
-
-using namespace std;
-
-DECLARE_int32(num_reactor_threads);
-DECLARE_int32(num_acceptor_threads);
-DECLARE_string(hostname);
-
-DECLARE_string(ssl_client_ca_certificate);
-DECLARE_string(ssl_server_certificate);
-DECLARE_string(ssl_private_key);
-DECLARE_string(ssl_private_key_password_cmd);
-DECLARE_string(ssl_cipher_list);
-
-// The path of the current executable file that is required for passing into the SASL
-// library as the 'application name'.
-static string CURRENT_EXECUTABLE_PATH;
-
-namespace impala {
-
-static int32_t SERVICE_PORT = FindUnusedEphemeralPort();
-
-const static string IMPALA_HOME(getenv("IMPALA_HOME"));
-const string& SERVER_CERT =
-    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
-const string& PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
-const string& BAD_SERVER_CERT =
-    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
-const string& BAD_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
-const string& PASSWORD_PROTECTED_PRIVATE_KEY =
-    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
-
-/// Use this class to set the appropriate required TLS flags for the duration of the
-/// lifetime of the object.
-/// It is assumed that the flags always hold empty values by default.
-class ScopedSetTlsFlags {
- public:
-  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
-      const string& pkey_passwd = "", const string& ciphers = "") {
-    FLAGS_ssl_server_certificate = cert;
-    FLAGS_ssl_private_key = pkey;
-    FLAGS_ssl_client_ca_certificate = ca_cert;
-    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
-    FLAGS_ssl_cipher_list = ciphers;
-  }
-
-  ~ScopedSetTlsFlags() {
-    FLAGS_ssl_server_certificate = "";
-    FLAGS_ssl_private_key = "";
-    FLAGS_ssl_client_ca_certificate = "";
-    FLAGS_ssl_private_key_password_cmd = "";
-    FLAGS_ssl_cipher_list = "";
-  }
-};
-
-// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
-// support.
-const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
-const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
-
-#define PAYLOAD_SIZE (4096)
-
-template <class T> class RpcMgrTestBase : public T {
- public:
-  // Utility function to initialize the parameter for ScanMem RPC.
-  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
-  // to 'controller'. Also sets up 'request' with the random value and index of the
-  // sidecar.
-  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
-    int32_t pattern = random();
-    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
-    int idx;
-    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
-    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
-    request->set_pattern(pattern);
-    request->set_sidecar_idx(idx);
-  }
-
-  // Takes over ownership of the newly created 'service' which needs to have a lifetime
-  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
-  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
-    services_.emplace_back(move(service));
-    return services_.back().get();
-  }
-
- protected:
-  TNetworkAddress krpc_address_;
-  RpcMgr rpc_mgr_;
-
-  virtual void SetUp() override {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
-    exec_env_.reset(new ExecEnv());
-    ASSERT_OK(rpc_mgr_.Init());
-  }
-
-  virtual void TearDown() override {
-    rpc_mgr_.Shutdown();
-  }
-
- private:
-  int32_t payload_[PAYLOAD_SIZE];
-
-  // Own all the services used by the test.
-  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;
-
-  // Required to set up the RPC metric groups used by the service pool.
-  std::unique_ptr<ExecEnv> exec_env_;
-};
-
-typedef std::function<void(RpcContext*)> ServiceCB;
-
-class PingServiceImpl : public PingServiceIf {
- public:
-  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
-  PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
-      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), cb_(cb) {}
-
-  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
-      context) override {
-    response->set_int_response(42);
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_.Release(context->GetTransferSize());
-    cb_(context);
-  }
-
-  MemTracker* mem_tracker() { return &mem_tracker_; }
-
- private:
-  MemTracker mem_tracker_;
-  ServiceCB cb_;
-};
-
-class ScanMemServiceImpl : public ScanMemServiceIf {
- public:
-  ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(-1, "ScanMem Service") {
-  }
-
-  // The request comes with an int 'pattern' and a payload of int array sent with
-  // sidecar. Scan the array to make sure every element matches 'pattern'.
-  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
-      RpcContext* context) override {
-    int32_t pattern = request->pattern();
-    Slice payload;
-    ASSERT_OK(
-        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
-    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
-
-    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
-    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
-      int32_t val = v[i];
-      if (val != pattern) {
-        // Incoming requests will already be tracked and we need to release the memory.
-        mem_tracker_.Release(context->GetTransferSize());
-        context->RespondFailure(kudu::Status::Corruption(
-            Substitute("Expecting $1; Found $2", pattern, val)));
-        return;
-      }
-    }
-    // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_.Release(context->GetTransferSize());
-    context->RespondSuccess();
-  }
-
-  MemTracker* mem_tracker() { return &mem_tracker_; }
-
- private:
-  MemTracker mem_tracker_;
-
-};
-
-template <class T>
-Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
-    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  // Test that a service can be started, and will respond to requests.
-  GeneratedServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
-      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
-      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
-
-  // Test that a second service, that verifies the RPC payload is not corrupted,
-  // can be started.
-  GeneratedServiceIf* scan_mem_impl = test_base->TakeOverService(
-      make_unique<ScanMemServiceImpl>(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
-      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
-
-  FLAGS_num_acceptor_threads = 2;
-  FLAGS_num_reactor_threads = 10;
-  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
-
-  unique_ptr<PingServiceProxy> ping_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, FLAGS_hostname,
-      &ping_proxy));
-
-  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
-  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, FLAGS_hostname,
-      &scan_mem_proxy));
-
-  RpcController controller;
-  srand(0);
-  // Randomly invoke either services to make sure a RpcMgr can host multiple
-  // services at the same time.
-  for (int i = 0; i < 100; ++i) {
-    controller.Reset();
-    if (random() % 2 == 0) {
-      PingRequestPB request;
-      PingResponsePB response;
-      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
-          "unable to execute Ping() RPC.");
-      if (response.int_response() != 42) {
-          return Status(Substitute(
-              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
-                  response.int_response()));
-      }
-    } else {
-      ScanMemRequestPB request;
-      ScanMemResponsePB response;
-      test_base->SetupScanMemRequest(&request, &controller);
-      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
-          "unable to execute ScanMem() RPC.");
-    }
-  }
-
-  return Status::OK();
-}
-
-} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 6a774bd..0d4ad47 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/rpc-mgr-test-base.h"
+#include "rpc/rpc-mgr-test.h"
+
+#include "kudu/util/monotime.h"
 #include "service/fe-support.h"
+#include "testutil/mini-kdc-wrapper.h"
+#include "util/counting-barrier.h"
 
 using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::RpcController;
@@ -28,18 +32,8 @@ DECLARE_int32(num_acceptor_threads);
 DECLARE_int32(rpc_negotiation_timeout_ms);
 DECLARE_string(hostname);
 
-namespace impala {
-
 // For tests that do not require kerberized testing, we use RpcTest.
-class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
-  virtual void SetUp() override {
-    RpcMgrTestBase::SetUp();
-  }
-
-  virtual void TearDown() override {
-    RpcMgrTestBase::TearDown();
-  }
-};
+namespace impala {
 
 TEST_F(RpcMgrTest, MultipleServicesTls) {
   // TODO: We're starting a seperate RpcMgr here instead of configuring
@@ -57,12 +51,12 @@ TEST_F(RpcMgrTest, MultipleServicesTls) {
   ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
   ASSERT_OK(tls_rpc_mgr.Init());
 
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
 TEST_F(RpcMgrTest, MultipleServices) {
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
+  ASSERT_OK(RunMultipleServicesTest(&rpc_mgr_, krpc_address_));
 }
 
 // Test with a misconfigured TLS certificate and verify that an error is thrown.
@@ -112,7 +106,7 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ASSERT_OK(tls_rpc_mgr.Init());
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
@@ -146,7 +140,7 @@ TEST_F(RpcMgrTest, ValidCiphersTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ASSERT_OK(tls_rpc_mgr.Init());
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
@@ -165,7 +159,7 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ASSERT_OK(tls_rpc_mgr.Init());
-  ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
+  ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
 
@@ -179,8 +173,8 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  GeneratedServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
-      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
+  GeneratedServiceIf* ping_impl =
+      TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_, slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
   ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
@@ -205,8 +199,8 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  GeneratedServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
-      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  GeneratedServiceIf* scan_mem_impl =
+      TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_));
   ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
@@ -252,8 +246,7 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
   secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port);
 
   ASSERT_OK(secondary_rpc_mgr.Init());
-  ASSERT_FALSE(RunMultipleServicesTestTemplate(
-      this, &secondary_rpc_mgr, secondary_krpc_address).ok());
+  ASSERT_FALSE(RunMultipleServicesTest(&secondary_rpc_mgr, secondary_krpc_address).ok());
   secondary_rpc_mgr.Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr-test.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h
new file mode 100644
index 0000000..bd2ea96
--- /dev/null
+++ b/be/src/rpc/rpc-mgr-test.h
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RPC_RPC_MGR_TEST_H
+#define IMPALA_RPC_RPC_MGR_TEST_H
+
+#include "rpc/rpc-mgr.inline.h"
+
+#include "common/init.h"
+#include "exec/kudu-util.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/util/status.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/gtest-util.h"
+#include "testutil/scoped-flag-setter.h"
+#include "util/network-util.h"
+#include "util/openssl-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/rpc_test.proxy.h"
+#include "gen-cpp/rpc_test.service.h"
+#include "gen-cpp/rpc_test.pb.h"
+
+#include "common/names.h"
+
+using kudu::rpc::GeneratedServiceIf;
+using kudu::rpc::RpcController;
+using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
+using kudu::Slice;
+
+using namespace std;
+
+DECLARE_int32(num_reactor_threads);
+DECLARE_int32(num_acceptor_threads);
+DECLARE_string(hostname);
+
+DECLARE_string(ssl_client_ca_certificate);
+DECLARE_string(ssl_server_certificate);
+DECLARE_string(ssl_private_key);
+DECLARE_string(ssl_private_key_password_cmd);
+DECLARE_string(ssl_cipher_list);
+
+// The path of the current executable file that is required for passing into the SASL
+// library as the 'application name'.
+static string CURRENT_EXECUTABLE_PATH;
+
+namespace impala {
+
+static int32_t SERVICE_PORT = FindUnusedEphemeralPort();
+
+const static string IMPALA_HOME(getenv("IMPALA_HOME"));
+const string& SERVER_CERT =
+    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
+const string& PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
+const string& BAD_SERVER_CERT =
+    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
+const string& BAD_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
+const string& PASSWORD_PROTECTED_PRIVATE_KEY =
+    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
+
+/// Use this class to set the appropriate required TLS flags for the duration of the
+/// lifetime of the object.
+/// It is assumed that the flags always hold empty values by default.
+class ScopedSetTlsFlags {
+ public:
+  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
+      const string& pkey_passwd = "", const string& ciphers = "") {
+    FLAGS_ssl_server_certificate = cert;
+    FLAGS_ssl_private_key = pkey;
+    FLAGS_ssl_client_ca_certificate = ca_cert;
+    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
+    FLAGS_ssl_cipher_list = ciphers;
+  }
+
+  ~ScopedSetTlsFlags() {
+    FLAGS_ssl_server_certificate = "";
+    FLAGS_ssl_private_key = "";
+    FLAGS_ssl_client_ca_certificate = "";
+    FLAGS_ssl_private_key_password_cmd = "";
+    FLAGS_ssl_cipher_list = "";
+  }
+};
+
+// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
+// support.
+const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
+const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
+
+#define PAYLOAD_SIZE (4096)
+
+class RpcMgrTest : public testing::Test {
+ public:
+  // Utility function to initialize the parameter for ScanMem RPC.
+  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
+  // to 'controller'. Also sets up 'request' with the random value and index of the
+  // sidecar.
+  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
+    int32_t pattern = random();
+    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
+    int idx;
+    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
+    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
+    request->set_pattern(pattern);
+    request->set_sidecar_idx(idx);
+  }
+
+  // Utility function which alternately makes requests to PingService and ScanMemService.
+  Status RunMultipleServicesTest(RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address);
+
+ protected:
+  TNetworkAddress krpc_address_;
+  RpcMgr rpc_mgr_;
+
+  virtual void SetUp() {
+    IpAddr ip;
+    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
+    exec_env_.reset(new ExecEnv());
+    ASSERT_OK(rpc_mgr_.Init());
+  }
+
+  virtual void TearDown() {
+    rpc_mgr_.Shutdown();
+  }
+
+  // Takes over ownership of the newly created 'service' which needs to have a lifetime
+  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
+  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
+    services_.emplace_back(move(service));
+    return services_.back().get();
+  }
+
+ private:
+  int32_t payload_[PAYLOAD_SIZE];
+
+  // Own all the services used by the test.
+  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;
+
+  // Required to set up the RPC metric groups used by the service pool.
+  std::unique_ptr<ExecEnv> exec_env_;
+};
+
+typedef std::function<void(RpcContext*)> ServiceCB;
+
+class PingServiceImpl : public PingServiceIf {
+ public:
+  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
+  PingServiceImpl(RpcMgr* rpc_mgr,
+      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
+    : PingServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      rpc_mgr_(rpc_mgr),
+      mem_tracker_(-1, "Ping Service"),
+      cb_(cb) {}
+
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, RpcContext* context) override {
+    return rpc_mgr_->Authorize("PingService", context, mem_tracker());
+  }
+
+  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
+      context) override {
+    response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_.Release(context->GetTransferSize());
+    cb_(context);
+  }
+
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
+ private:
+  RpcMgr* rpc_mgr_;
+  MemTracker mem_tracker_;
+  ServiceCB cb_;
+};
+
+class ScanMemServiceImpl : public ScanMemServiceIf {
+ public:
+  ScanMemServiceImpl(RpcMgr* rpc_mgr)
+    : ScanMemServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
+      mem_tracker_(-1, "ScanMem Service") {
+  }
+
+  // A no-op authorization function.
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, RpcContext* context) override {
+    return true;
+  }
+
+  // The request comes with an int 'pattern' and a payload of int array sent with
+  // sidecar. Scan the array to make sure every element matches 'pattern'.
+  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
+      RpcContext* context) override {
+    int32_t pattern = request->pattern();
+    Slice payload;
+    ASSERT_OK(
+        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
+    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
+
+    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
+    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
+      int32_t val = v[i];
+      if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release the memory.
+        mem_tracker_.Release(context->GetTransferSize());
+        context->RespondFailure(kudu::Status::Corruption(
+            Substitute("Expecting $1; Found $2", pattern, val)));
+        return;
+      }
+    }
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_.Release(context->GetTransferSize());
+    context->RespondSuccess();
+  }
+
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
+ private:
+  MemTracker mem_tracker_;
+
+};
+
+Status RpcMgrTest::RunMultipleServicesTest(
+    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  // Test that a service can be started, and will respond to requests.
+  GeneratedServiceIf* ping_impl = TakeOverService(
+      make_unique<PingServiceImpl>(rpc_mgr));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
+
+  // Test that a second service, that verifies the RPC payload is not corrupted,
+  // can be started.
+  GeneratedServiceIf* scan_mem_impl =
+      TakeOverService(make_unique<ScanMemServiceImpl>(rpc_mgr));
+
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
+
+  FLAGS_num_acceptor_threads = 2;
+  FLAGS_num_reactor_threads = 10;
+  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
+
+  unique_ptr<PingServiceProxy> ping_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, FLAGS_hostname,
+      &ping_proxy));
+
+  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
+  RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, FLAGS_hostname,
+      &scan_mem_proxy));
+
+  RpcController controller;
+  srand(0);
+  // Randomly invoke either services to make sure a RpcMgr can host multiple
+  // services at the same time.
+  for (int i = 0; i < 100; ++i) {
+    controller.Reset();
+    if (random() % 2 == 0) {
+      PingRequestPB request;
+      PingResponsePB response;
+      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
+          "unable to execute Ping() RPC.");
+      if (response.int_response() != 42) {
+          return Status(Substitute(
+              "Ping() failed. Incorrect response. Expected: 42; Got: $0",
+                  response.int_response()));
+      }
+    } else {
+      ScanMemRequestPB request;
+      ScanMemResponsePB response;
+      SetupScanMemRequest(&request, &controller);
+      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
+          "unable to execute ScanMem() RPC.");
+    }
+  }
+  return Status::OK();
+}
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index a95f181..cda7161 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -19,11 +19,15 @@
 
 #include "exec/kudu-util.h"
 #include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/service_if.h"
+#include "kudu/security/init.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
+#include "runtime/mem-tracker.h"
 #include "util/auth-util.h"
 #include "util/cpu-info.h"
 #include "util/network-util.h"
@@ -39,11 +43,13 @@ using kudu::MonoDelta;
 using kudu::rpc::AcceptorPool;
 using kudu::rpc::DumpRunningRpcsRequestPB;
 using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::GeneratedServiceIf;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::Messenger;
+using kudu::rpc::RemoteUser;
 using kudu::rpc::RpcConnectionPB;
+using kudu::rpc::RpcContext;
 using kudu::rpc::RpcController;
-using kudu::rpc::GeneratedServiceIf;
 using kudu::Sockaddr;
 
 DECLARE_string(hostname);
@@ -107,7 +113,6 @@ Status RpcMgr::Init() {
   if (IsKerberosEnabled()) {
     string internal_principal;
     RETURN_IF_ERROR(GetInternalKerberosPrincipal(&internal_principal));
-
     string service_name, unused_hostname, unused_realm;
     RETURN_IF_ERROR(ParseKerberosPrincipal(internal_principal, &service_name,
         &unused_hostname, &unused_realm));
@@ -151,6 +156,30 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
   return Status::OK();
 }
 
+bool RpcMgr::Authorize(const string& service_name, RpcContext* context,
+    MemTracker* mem_tracker) const {
+  // Authorization is enforced iff Kerberos is enabled.
+  if (!IsKerberosEnabled()) return true;
+
+  // Check if the mapped username matches that of the kinit'ed principal.
+  const RemoteUser& remote_user = context->remote_user();
+  const string& logged_in_username =
+      kudu::security::GetLoggedInUsernameFromKeytab().value_or("");
+  DCHECK(!logged_in_username.empty());
+  bool authorized = remote_user.username() == logged_in_username &&
+      remote_user.authenticated_by() == RemoteUser::Method::KERBEROS;
+  if (UNLIKELY(!authorized)) {
+    LOG(ERROR) << Substitute("Rejecting unauthorized access to $0 from $1. Expected "
+        "user $2", service_name, context->requestor_string(), logged_in_username);
+    mem_tracker->Release(context->GetTransferSize());
+    context->RespondFailure(kudu::Status::NotAuthorized(
+        Substitute("$0 is not allowed to access $1",
+            remote_user.ToString(), service_name)));
+    return false;
+  }
+  return true;
+}
+
 Status RpcMgr::StartServices(const TNetworkAddress& address) {
   DCHECK(is_inited()) << "Must call Init() before StartServices()";
   DCHECK(!services_started_) << "May not call StartServices() twice";

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index c25f754..6435a74 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -133,6 +133,16 @@ class RpcMgr {
       kudu::rpc::GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
+  /// Returns true if the given 'remote_user' in RpcContext 'context' is authorized to
+  /// access 'service_name' registered with this RpcMgr. Authorization is only enforced
+  /// when Kerberos is enabled.
+  ///
+  /// If authorization is denied, the RPC is responded to with an error message. Memory
+  /// of RPC payloads accounted towards 'mem_tracker', the service's MemTracker, is also
+  /// released.
+  bool Authorize(const string& service_name, kudu::rpc::RpcContext* context,
+      MemTracker* mem_tracker) const;
+
   /// Creates a new proxy for a remote service of type P at location 'address' with
   /// hostname 'hostname' and places it in 'proxy'. 'P' must descend from
   /// kudu::rpc::ServiceIf. Note that 'address' must be a resolved IP address.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index af867de..621f557 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -37,6 +37,7 @@ using apache::thrift::transport::SSLProtocol;
 
 DECLARE_string(principal);
 DECLARE_string(be_principal);
+DECLARE_string(keytab_file);
 DECLARE_string(ssl_client_ca_certificate);
 DECLARE_string(ssl_cipher_list);
 DECLARE_string(ssl_minimum_version);
@@ -59,8 +60,9 @@ static const string& PASSWORD_PROTECTED_PRIVATE_KEY =
     Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
 
 // The principal name and the realm used for creating the mini-KDC.
-static const string kdc_principal = "impala/localhost";
-static const string kdc_realm = "KRBTEST.COM";
+static const string principal = "impala/localhost";
+static const string realm = "KRBTEST.COM";
+static string principal_kt_path;
 
 // Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
 // support.
@@ -105,9 +107,11 @@ class ThriftKerberizedParamsTest :
     if (k == KERBEROS_OFF) {
       FLAGS_principal.clear();
       FLAGS_be_principal.clear();
+      FLAGS_keytab_file.clear();
     } else {
       FLAGS_principal = "dummy-service/host@realm";
-      FLAGS_be_principal = strings::Substitute("$0@$1", kdc_principal, kdc_realm);
+      FLAGS_be_principal = strings::Substitute("$0@$1", principal, realm);
+      FLAGS_keytab_file = principal_kt_path;
     }
     ASSERT_OK(InitAuth(CURRENT_EXECUTABLE_PATH));
     ThriftTestBase::SetUp();
@@ -116,6 +120,7 @@ class ThriftKerberizedParamsTest :
   virtual void TearDown() override {
     FLAGS_principal.clear();
     FLAGS_be_principal.clear();
+    FLAGS_keytab_file.clear();
   }
 };
 
@@ -558,8 +563,12 @@ int main(int argc, char** argv) {
 
   int port = impala::FindUnusedEphemeralPort();
   std::unique_ptr<impala::MiniKdcWrapper> kdc;
-  Status status = impala::MiniKdcWrapper::SetupAndStartMiniKDC(
-      kdc_principal, kdc_realm, "24h", "7d", port, &kdc);
+  Status status =
+      impala::MiniKdcWrapper::SetupAndStartMiniKDC(realm, "24h", "7d", port, &kdc);
+  DCHECK(status.ok());
+
+  // Create the service principal and keytab used for this test.
+  status = kdc->CreateServiceKeytab(principal, &principal_kt_path);
   DCHECK(status.ok());
 
   // Fill in the path of the current binary for use by the tests.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index c1f9cc6..de5e349 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -107,6 +107,11 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
     return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this, mem_tracker());
   }
 
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, kudu::rpc::RpcContext* context) {
+    return true;
+  }
+
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, RpcContext* rpc_context) {
     stream_mgr_->AddData(request, response, rpc_context);

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index b7892ff..723d7ca 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -71,6 +71,12 @@ Status DataStreamService::Init() {
   return Status::OK();
 }
 
+bool DataStreamService::Authorize(const google::protobuf::Message* req,
+    google::protobuf::Message* resp, RpcContext* context) {
+  return ExecEnv::GetInstance()->rpc_mgr()->Authorize("DataStreamService", context,
+      mem_tracker());
+}
+
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {
   // CloseSender() is guaranteed to eventually respond to this RPC so we don't do it here.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index e233165..5fdf6dd 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -46,6 +46,12 @@ class DataStreamService : public DataStreamServiceIf {
   /// This mustn't be called until RPC manager has been initialized.
   Status Init();
 
+  /// Returns true iff the 'remote_user' in 'context' is authorized to access
+  /// DataStreamService. On denied access, the RPC is replied to with an error message.
+  /// Authorization is enforced only when Kerberos is enabled.
+  virtual bool Authorize(const google::protobuf::Message* req,
+      google::protobuf::Message* resp, kudu::rpc::RpcContext* context);
+
   /// Notifies the receiver to close the data stream specified in 'request'.
   /// The receiver replies to the client with a status serialized in 'response'.
   virtual void EndDataStream(const EndDataStreamRequestPB* request,

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/testutil/mini-kdc-wrapper.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/mini-kdc-wrapper.cc b/be/src/testutil/mini-kdc-wrapper.cc
index 526e5b1..11c4d64 100644
--- a/be/src/testutil/mini-kdc-wrapper.cc
+++ b/be/src/testutil/mini-kdc-wrapper.cc
@@ -32,11 +32,12 @@ using filesystem::path;
 
 DECLARE_string(keytab_file);
 DECLARE_string(krb5_conf);
+DECLARE_string(krb5_ccname);
 
 Status MiniKdcWrapper::StartKdc(string keytab_dir) {
   kudu::MiniKdcOptions options;
   options.realm = realm_;
-  options.data_root = keytab_dir;
+  options.data_root = move(keytab_dir);
   options.ticket_lifetime = ticket_lifetime_;
   options.renew_lifetime = renew_lifetime_;
   options.port = kdc_port_;
@@ -54,34 +55,42 @@ Status MiniKdcWrapper::StopKdc() {
   return Status::OK();
 }
 
+Status MiniKdcWrapper::Kinit(const string& username) {
+  KUDU_RETURN_IF_ERROR(kdc_->Kinit(username), "Failed to kinit.");
+  return Status::OK();
+}
+
+Status MiniKdcWrapper::CreateUserPrincipal(const string& username) {
+  KUDU_RETURN_IF_ERROR(kdc_->CreateUserPrincipal(username),
+      "Failed to create user principal.");
+  return Status::OK();
+}
+
 Status MiniKdcWrapper::CreateServiceKeytab(const string& spn, string* kt_path) {
   KUDU_RETURN_IF_ERROR(kdc_->CreateServiceKeytab(spn, kt_path),
       "Failed to create service keytab.");
   return Status::OK();
 }
 
-Status MiniKdcWrapper::SetupAndStartMiniKDC(string spn, string realm,
-    string ticket_lifetime, string renew_lifetime, int kdc_port,
-    unique_ptr<MiniKdcWrapper>* kdc_ptr) {
-  std::unique_ptr<MiniKdcWrapper> kdc(new MiniKdcWrapper(
-      spn, realm, ticket_lifetime, renew_lifetime, kdc_port));
+Status MiniKdcWrapper::SetupAndStartMiniKDC(string realm,
+    string ticket_lifetime, string renew_lifetime,
+    int kdc_port, unique_ptr<MiniKdcWrapper>* kdc_ptr) {
+  unique_ptr<MiniKdcWrapper> kdc(new MiniKdcWrapper(
+      move(realm), move(ticket_lifetime), move(renew_lifetime), kdc_port));
   DCHECK(kdc.get() != nullptr);
 
   // Enable the workaround for MIT krb5 1.10 bugs from krb5_realm_override.cc.
   setenv("KUDU_ENABLE_KRB5_REALM_FIX", "true", 0);
 
   // Check if the unique directory already exists, and create it if it doesn't.
-  RETURN_IF_ERROR(FileSystemUtil::RemoveAndCreateDirectory(kdc->unique_test_dir_.string()));
+  RETURN_IF_ERROR(
+      FileSystemUtil::RemoveAndCreateDirectory(kdc->unique_test_dir_.string()));
   string keytab_dir = kdc->unique_test_dir_.string() + "/krb5kdc";
 
   RETURN_IF_ERROR(kdc->StartKdc(keytab_dir));
 
-  string kt_path;
-  RETURN_IF_ERROR(kdc->CreateServiceKeytab(kdc->spn_, &kt_path));
-
   // Set the appropriate flags based on how we've set up the kerberos environment.
   FLAGS_krb5_conf = strings::Substitute("$0/$1", keytab_dir, "krb5.conf");
-  FLAGS_keytab_file = kt_path;
 
   *kdc_ptr = std::move(kdc);
   return Status::OK();
@@ -91,7 +100,6 @@ Status MiniKdcWrapper::TearDownMiniKDC() {
   RETURN_IF_ERROR(StopKdc());
 
   // Clear the flags so we don't step on other tests that may run in the same process.
-  FLAGS_keytab_file.clear();
   FLAGS_krb5_conf.clear();
 
   // Remove test directory.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/testutil/mini-kdc-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/mini-kdc-wrapper.h b/be/src/testutil/mini-kdc-wrapper.h
index 602c15b..c233dda 100644
--- a/be/src/testutil/mini-kdc-wrapper.h
+++ b/be/src/testutil/mini-kdc-wrapper.h
@@ -40,19 +40,33 @@ class MiniKdcWrapper {
   /// This function creates the 'unique_test_dir_' path, starts the KDC and sets the
   /// appropriate flags that Impala requires to run with Kerberos. The newly created
   /// KDC is stored in 'kdc_ptr'. Return error status on failure.
-  static Status SetupAndStartMiniKDC(std::string spn, std::string realm,
-      std::string ticket_lifetime, std::string renew_lifetime, int kdc_port,
-      std::unique_ptr<MiniKdcWrapper>* kdc_ptr);
+  static Status SetupAndStartMiniKDC(std::string realm,
+      std::string ticket_lifetime, std::string renew_lifetime,
+      int kdc_port, std::unique_ptr<MiniKdcWrapper>* kdc_ptr);
 
   /// Undoes everything done by SetupAndStartMiniKDC().
   Status TearDownMiniKDC();
 
+  /// Kinit a user to the mini KDC.
+  Status Kinit(const string& username);
+
+  /// Creates a new user with the given username.
+  /// The password is the same as the username.
+  Status CreateUserPrincipal(const string& username);
+
+  /// Creates a keytab file under the 'unique_test_dir_' path which is configured to
+  /// authenticate the service principal 'spn'. The path to the file is returned as a
+  /// string in 'kt_path'.
+  Status CreateServiceKeytab(const string& spn, string* kt_path);
+
+  /// Returns the environment variable ""KRB5CCNAME" configured in the setup of mini-kdc.
+  const string GetKrb5CCname() const {
+    return kdc_->GetEnvVars()["KRB5CCNAME"];
+  }
+
  private:
   boost::scoped_ptr<kudu::MiniKdc> kdc_;
 
-  /// The service's principal name.
-  const std::string spn_;
-
   /// The name of the kerberos realm to setup.
   const std::string realm_;
 
@@ -69,26 +83,20 @@ class MiniKdcWrapper {
   boost::filesystem::path unique_test_dir_ = boost::filesystem::unique_path();
 
   /// Called by SetupAndStartMiniKDC() only.
-  MiniKdcWrapper(std::string spn, std::string realm, std::string ticket_lifetime,
-    std::string renew_lifetime, int kdc_port) :
-      spn_(spn),
-      realm_(realm),
-      ticket_lifetime_(ticket_lifetime),
-      renew_lifetime_(renew_lifetime),
+  MiniKdcWrapper(std::string&& realm, std::string&& ticket_lifetime,
+      std::string&& renew_lifetime, int kdc_port)
+    : realm_(std::move(realm)),
+      ticket_lifetime_(std::move(ticket_lifetime)),
+      renew_lifetime_(std::move(renew_lifetime)),
       kdc_port_(kdc_port) {
   }
 
   /// Starts the KDC and configures it to use 'keytab_dir' as the location to store the
   /// keytab. The 'keytab_dir' will not be cleaned up by this class.
-  Status StartKdc(string keytab_dir);
+  Status StartKdc(std::string keytab_dir);
 
   /// Stops the KDC by terminating the krb5kdc subprocess.
   Status StopKdc();
-
-  /// Creates a keytab file under the 'unique_test_dir_' path which is configured to
-  /// authenticate the service principal 'spn_'. The path to the file is returned as a
-  /// string in 'kt_path'.
-  Status CreateServiceKeytab(const string& spn, string* kt_path);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/util/auth-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.cc b/be/src/util/auth-util.cc
index 50af438..e8a0e41 100644
--- a/be/src/util/auth-util.cc
+++ b/be/src/util/auth-util.cc
@@ -31,25 +31,25 @@ namespace impala {
 // Pattern for hostname substitution.
 static const string HOSTNAME_PATTERN = "_HOST";
 
-  const string& GetEffectiveUser(const TSessionState& session) {
-    if (session.__isset.delegated_user && !session.delegated_user.empty()) {
-      return session.delegated_user;
-    }
-    return session.connected_user;
+const string& GetEffectiveUser(const TSessionState& session) {
+  if (session.__isset.delegated_user && !session.delegated_user.empty()) {
+    return session.delegated_user;
   }
+  return session.connected_user;
+}
 
-  const string& GetEffectiveUser(const ImpalaServer::SessionState& session) {
-    return session.do_as_user.empty() ? session.connected_user : session.do_as_user;
-  }
+const string& GetEffectiveUser(const ImpalaServer::SessionState& session) {
+  return session.do_as_user.empty() ? session.connected_user : session.do_as_user;
+}
 
-  Status CheckProfileAccess(const string& user, const string& effective_user,
-      bool has_access) {
-    if (user.empty() || (user == effective_user && has_access)) return Status::OK();
-    stringstream ss;
-    ss << "User " << user << " is not authorized to access the runtime profile or "
-       << "execution summary.";
-    return Status(ss.str());
-  }
+Status CheckProfileAccess(const string& user, const string& effective_user,
+    bool has_access) {
+  if (user.empty() || (user == effective_user && has_access)) return Status::OK();
+  stringstream ss;
+  ss << "User " << user << " is not authorized to access the runtime profile or "
+     << "execution summary.";
+  return Status(ss.str());
+}
 
 // Replaces _HOST with the hostname if it occurs in the principal string.
 Status ReplacePrincipalHostFormat(string* out_principal) {
@@ -83,6 +83,7 @@ Status GetInternalKerberosPrincipal(string* out_principal) {
 
 Status ParseKerberosPrincipal(const string& principal, string* service_name,
     string* hostname, string* realm) {
+  // TODO: IMPALA-7504: replace this with krb5_parse_name().
   vector<string> names;
 
   split(names, principal, is_any_of("/"));
@@ -100,8 +101,4 @@ Status ParseKerberosPrincipal(const string& principal, string* service_name,
   return Status::OK();
 }
 
-bool IsKerberosEnabled() {
-  return !FLAGS_principal.empty();
-}
-
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/be/src/util/auth-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/auth-util.h b/be/src/util/auth-util.h
index 4b9fed6..f73e962 100644
--- a/be/src/util/auth-util.h
+++ b/be/src/util/auth-util.h
@@ -22,6 +22,8 @@
 #include <string>
 #include "service/impala-server.h"
 
+DECLARE_string(principal);
+
 namespace impala {
 
 class TSessionState;
@@ -63,7 +65,9 @@ Status ParseKerberosPrincipal(const std::string& principal, std::string* service
     std::string* hostname, std::string* realm);
 
 /// Returns true if kerberos is enabled.
-bool IsKerberosEnabled();
+inline bool IsKerberosEnabled() {
+  return !FLAGS_principal.empty();
+}
 
 } // namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/bin/rat_exclude_files.txt
----------------------------------------------------------------------
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index 10f36af..c0c8fe2 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -43,6 +43,7 @@ www/d3.v3.min.js
 www/jquery/jquery-1.12.4.min.js
 tests/comparison/leopard/static/css/hljs.css
 tests/comparison/leopard/static/js/highlight.pack.js
+common/protobuf/kudu
 be/src/kudu/util/array_view.h
 be/src/kudu/util/cache-test.cc
 be/src/kudu/util/cache.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 854eb87..68c2e90 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -21,6 +21,8 @@ package impala;
 import "common.proto";
 import "row_batch.proto";
 
+import "kudu/rpc/rpc_header.proto";
+
 // All fields are required in V1.
 message TransmitDataRequestPB {
   // The fragment instance id of the receiver.
@@ -76,6 +78,9 @@ message EndDataStreamResponsePB {
 
 // Handles data transmission between fragment instances.
 service DataStreamService {
+  // Override the default authorization method.
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
   // Called by sender to transmit a single row batch. Returns error indication
   // if params.fragmentId or params.destNodeId are unknown or if data couldn't
   // be read.

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/common/protobuf/kudu
----------------------------------------------------------------------
diff --git a/common/protobuf/kudu b/common/protobuf/kudu
new file mode 120000
index 0000000..6631864
--- /dev/null
+++ b/common/protobuf/kudu
@@ -0,0 +1 @@
+../../be/src/kudu/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/5c541b96/common/protobuf/rpc_test.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/rpc_test.proto b/common/protobuf/rpc_test.proto
index fd22331..70782e5 100644
--- a/common/protobuf/rpc_test.proto
+++ b/common/protobuf/rpc_test.proto
@@ -17,6 +17,8 @@
 //
 package impala;
 
+import "kudu/rpc/rpc_header.proto";
+
 // Definitions for service used for rpc-mgr-test.
 message PingRequestPB {
 }
@@ -26,6 +28,8 @@ message PingResponsePB {
 }
 
 service PingService {
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
   rpc Ping(PingRequestPB) returns (PingResponsePB);
 }
 
@@ -38,5 +42,7 @@ message ScanMemResponsePB {
 }
 
 service ScanMemService {
+  option (kudu.rpc.default_authz_method) = "Authorize";
+
   rpc ScanMem(ScanMemRequestPB) returns (ScanMemResponsePB);
 }