You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/12/02 22:36:31 UTC

[impala] 02/03: IMPALA-8138: Reintroduce rpc debugging options

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f998427cf0a161c8ae6e57a77e095dada070aa26
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Oct 17 10:37:36 2019 -0700

    IMPALA-8138: Reintroduce rpc debugging options
    
    In the past, Impala had a very simple 'fault injection' framework for
    simulating failed rpcs between impalads. With the move to KRPC, that
    framework was not carried over, and we lost the ability to test
    certain failure scenarios.
    
    This patch reintroduces this functionality. It removes the prior fault
    injection framework in favor of the existing debug action framework,
    which is more flexible.
    
    To facilitate this, a few modifications are made to the debug action
    framework:
    - In addition to matching on a label, debug actions may now match on
      optional arguments. In this patch, the debug action
      IMPALA_SERVICE_POOL takes the arguments 'host', 'port', and
      'rpc name' to allow simulating the failure of specific rpcs to
      specific impalads.
    - The FAIL action now takes an optional 'error message' parameter. In
      this patch, the debug action IMPALA_SERVICE_POOL uses this to
      simulate different types of rpc errors, eg. 'service too busy'.
    - The FAIL action increments a metric, 'impala.debug_action.fail', so
      that tests can check that it has actually been hit. Prior to this
      patch the tests in test_rpc_exception.py where all passing
      spuriously as the faults they were supposed to be testing were no
      longer being injected.
    
    This patch uses these new mechanisms to introduce tests that simulate
    failures in DataStreamService rpcs. Follow up patches will add test
    cases for ControlService rpcs.
    
    Change-Id: I9c047ebce6d32c5ae461f70279391fa2df4c2029
    Reviewed-on: http://gerrit.cloudera.org:8080/14641
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/impala-service-pool.cc          |  25 +++++--
 be/src/rpc/impala-service-pool.h           |   8 ++-
 be/src/rpc/rpc-mgr-kerberized-test.cc      |   4 +-
 be/src/rpc/rpc-mgr-test.cc                 |  27 +++----
 be/src/rpc/rpc-mgr-test.h                  |   4 +-
 be/src/rpc/rpc-mgr.cc                      |  19 ++---
 be/src/rpc/rpc-mgr.h                       |  17 +++--
 be/src/runtime/backend-client.h            |   1 -
 be/src/runtime/data-stream-test.cc         |   4 +-
 be/src/runtime/exec-env.cc                 |   4 +-
 be/src/runtime/test-env.cc                 |   2 +-
 be/src/service/control-service.cc          |   1 -
 be/src/service/data-stream-service.cc      |   1 -
 be/src/service/impala-internal-service.cc  |   1 -
 be/src/testutil/CMakeLists.txt             |   1 -
 be/src/testutil/fault-injection-util.cc    |  88 -----------------------
 be/src/testutil/fault-injection-util.h     |  65 -----------------
 be/src/util/debug-util.cc                  |  68 ++++++++++++------
 be/src/util/debug-util.h                   |  16 +++--
 be/src/util/impalad-metrics.cc             |   6 ++
 be/src/util/impalad-metrics.h              |   5 ++
 common/thrift/ImpalaService.thrift         |  21 ++++--
 common/thrift/metrics.json                 |  10 +++
 tests/common/impala_cluster.py             |   5 +-
 tests/common/impala_service.py             |   5 +-
 tests/custom_cluster/test_rpc_exception.py | 109 ++++++++++++++++++-----------
 26 files changed, 239 insertions(+), 278 deletions(-)

diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 9db137a..74fe2df 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "exec/kudu-util.h"
+#include "gutil/strings/numbers.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
@@ -58,11 +59,13 @@ const char * ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY =
 
 ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
     int service_queue_length, kudu::rpc::GeneratedServiceIf* service,
-    MemTracker* service_mem_tracker)
+    MemTracker* service_mem_tracker, const TNetworkAddress& address)
   : service_mem_tracker_(service_mem_tracker),
     service_(service),
     service_queue_(service_queue_length),
-    incoming_queue_time_(METRIC_impala_incoming_queue_time.Instantiate(entity)) {
+    incoming_queue_time_(METRIC_impala_incoming_queue_time.Instantiate(entity)),
+    hostname_(address.hostname),
+    port_(SimpleItoa(address.port)) {
   DCHECK(service_mem_tracker_ != nullptr);
   const TMetricDef& overflow_metric_def =
       MetricDefs::Get(RPC_QUEUE_OVERFLOW_METRIC_KEY, service_->service_name());
@@ -188,10 +191,22 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
     service_mem_tracker_->Consume(transfer_size);
   }
 
-  Status debug_status = DebugAction(FLAGS_debug_actions, "SERVICE_POOL_SERVER_BUSY");
+  // Debug action for simulating rpc errors. To use, specify:
+  // --debug_actions=IMPALA_SERVICE_POOL:<hostname>:<port>:<rpc>:<action>
+  // where <hostname> and <port> represent the impalad receiving the rpc, <port> is the BE
+  // krpc port (default 27000), <rpc> is the name of an rpc in the service, eg.
+  // 'TransmitData' or 'ReportExecStatus', and <action> is any of the debug actions, eg.
+  // FAIL or SLEEP.
+  Status debug_status = DebugAction(FLAGS_debug_actions, "IMPALA_SERVICE_POOL",
+      {hostname_, port_, c->remote_method().method_name()});
   if (UNLIKELY(!debug_status.ok())) {
-    // Simulate the service being too busy.
-    RejectTooBusy(c);
+    if (debug_status.msg().msg() == "REJECT_TOO_BUSY") {
+      // Simulate the service being too busy.
+      RejectTooBusy(c);
+    } else {
+      FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_UNKNOWN,
+          kudu::Status::RuntimeError(debug_status.msg().msg()), c);
+    }
     return kudu::Status::OK();
   }
 
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index dd56436..7771358 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -45,9 +45,11 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   ///
   /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of RPC
   /// payloads in the service queue.
+  ///
+  /// 'address' is the ip address and port that 'service' runs on.
   ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
       int service_queue_length, kudu::rpc::GeneratedServiceIf* service,
-      MemTracker* service_mem_tracker);
+      MemTracker* service_mem_tracker, const TNetworkAddress& address);
 
   virtual ~ImpalaServicePool();
 
@@ -111,6 +113,10 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   boost::mutex shutdown_lock_;
   bool closing_ = false;
 
+  /// The address this service is running on.
+  const std::string hostname_;
+  const std::string port_;
+
   DISALLOW_COPY_AND_ASSIGN(ImpalaServicePool);
 };
 
diff --git a/be/src/rpc/rpc-mgr-kerberized-test.cc b/be/src/rpc/rpc-mgr-kerberized-test.cc
index 86c9eaa..62b15e4 100644
--- a/be/src/rpc/rpc-mgr-kerberized-test.cc
+++ b/be/src/rpc/rpc-mgr-kerberized-test.cc
@@ -74,7 +74,7 @@ TEST_F(RpcMgrKerberizedTest, MultipleServicesTls) {
 
   // Enable TLS.
   ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
-  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address));
 
   ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
@@ -95,7 +95,7 @@ TEST_F(RpcMgrKerberizedTest, AuthorizationFail) {
       static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
-  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+  ASSERT_OK(rpc_mgr_.StartServices());
 
   // Switch over to a credentials cache which only contains the dummy credential.
   // Kinit done in InitAuth() uses a different credentials cache.
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index c0d4fbd..cb698fa 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -51,7 +51,7 @@ TEST_F(RpcMgrTest, MultipleServicesTls) {
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
   ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
-  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address));
 
   ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
@@ -74,7 +74,7 @@ TEST_F(RpcMgrTest, BadCertificateTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort();
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+  ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok());
   tls_rpc_mgr.Shutdown();
 }
 
@@ -91,7 +91,7 @@ TEST_F(RpcMgrTest, BadPasswordTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort();
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+  ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok());
   tls_rpc_mgr.Shutdown();
 }
 
@@ -108,7 +108,7 @@ TEST_F(RpcMgrTest, CorrectPasswordTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort();
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address));
   ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
@@ -125,7 +125,7 @@ TEST_F(RpcMgrTest, BadCiphersTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort();
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  ASSERT_FALSE(tls_rpc_mgr.Init().ok());
+  ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok());
   tls_rpc_mgr.Shutdown();
 }
 
@@ -142,7 +142,7 @@ TEST_F(RpcMgrTest, ValidCiphersTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort();
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address));
   ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
@@ -161,7 +161,7 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
   int32_t tls_service_port = FindUnusedEphemeralPort();
   tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
 
-  ASSERT_OK(tls_rpc_mgr.Init());
+  ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address));
   ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address));
   tls_rpc_mgr.Shutdown();
 }
@@ -186,7 +186,7 @@ TEST_F(RpcMgrTest, SlowCallback) {
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
-  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+  ASSERT_OK(rpc_mgr_.StartServices());
 
   unique_ptr<PingServiceProxy> proxy;
   ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address_,
@@ -216,7 +216,7 @@ TEST_F(RpcMgrTest, AsyncCall) {
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
-  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+  ASSERT_OK(rpc_mgr_.StartServices());
 
   RpcController controller;
   srand(0);
@@ -251,7 +251,7 @@ TEST_F(RpcMgrTest, NegotiationTimeout) {
   int32_t secondary_service_port = FindUnusedEphemeralPort();
   secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port);
 
-  ASSERT_OK(secondary_rpc_mgr.Init());
+  ASSERT_OK(secondary_rpc_mgr.Init(secondary_krpc_address));
   ASSERT_FALSE(RunMultipleServicesTest(&secondary_rpc_mgr, secondary_krpc_address).ok());
   secondary_rpc_mgr.Shutdown();
 }
@@ -299,7 +299,7 @@ TEST_F(RpcMgrTest, BusyService) {
       static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
-  ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
+  ASSERT_OK(rpc_mgr_.StartServices());
 
   // Find the counter which tracks the number of times the service queue is too full.
   const string& overflow_count = Substitute(
@@ -318,8 +318,9 @@ TEST_F(RpcMgrTest, BusyService) {
 
   // Use DebugAction to make the Impala Service Pool reject 50% of Krpc calls as if the
   // service is too busy.
-  auto s = ScopedFlagSetter<string>::Make(
-      &FLAGS_debug_actions, "SERVICE_POOL_SERVER_BUSY:FAIL@0.5");
+  auto s = ScopedFlagSetter<string>::Make(&FLAGS_debug_actions,
+      Substitute("IMPALA_SERVICE_POOL:$0:$1:Ping:FAIL@0.5@REJECT_TOO_BUSY",
+          krpc_address_.hostname, krpc_address_.port));
   PingRequestPB request;
   PingResponsePB response;
   const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
diff --git a/be/src/rpc/rpc-mgr-test.h b/be/src/rpc/rpc-mgr-test.h
index ad428a4..f559409 100644
--- a/be/src/rpc/rpc-mgr-test.h
+++ b/be/src/rpc/rpc-mgr-test.h
@@ -139,7 +139,7 @@ class RpcMgrTest : public testing::Test {
     ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
     krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
     exec_env_.reset(new ExecEnv());
-    ASSERT_OK(rpc_mgr_.Init());
+    ASSERT_OK(rpc_mgr_.Init(krpc_address_));
   }
 
   virtual void TearDown() {
@@ -303,7 +303,7 @@ Status RpcMgrTest::RunMultipleServicesTest(
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
-  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
+  RETURN_IF_ERROR(rpc_mgr->StartServices());
 
   unique_ptr<PingServiceProxy> ping_proxy;
   RETURN_IF_ERROR(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address,
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 3b407d7..09893e0 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -90,7 +90,10 @@ DEFINE_bool(rpc_use_loopback, false,
 
 namespace impala {
 
-Status RpcMgr::Init() {
+Status RpcMgr::Init(const TNetworkAddress& address) {
+  DCHECK(IsResolvedAddress(address));
+  address_ = address;
+
   // Log any RPCs which take longer than this threshold on the server.
   FLAGS_rpc_duration_too_long_ms = FLAGS_impala_slow_rpc_threshold_ms;
 
@@ -147,8 +150,9 @@ Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queu
     GeneratedServiceIf* service_ptr, MemTracker* service_mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
-  scoped_refptr<ImpalaServicePool> service_pool = new ImpalaServicePool(
-      messenger_->metric_entity(), service_queue_depth, service_ptr, service_mem_tracker);
+  scoped_refptr<ImpalaServicePool> service_pool =
+      new ImpalaServicePool(messenger_->metric_entity(), service_queue_depth, service_ptr,
+          service_mem_tracker, address_);
   // Start the thread pool first before registering the service in case the startup fails.
   RETURN_IF_ERROR(service_pool->Init(num_service_threads));
   KUDU_RETURN_IF_ERROR(
@@ -185,20 +189,19 @@ bool RpcMgr::Authorize(const string& service_name, RpcContext* context,
   return true;
 }
 
-Status RpcMgr::StartServices(const TNetworkAddress& address) {
+Status RpcMgr::StartServices() {
   DCHECK(is_inited()) << "Must call Init() before StartServices()";
   DCHECK(!services_started_) << "May not call StartServices() twice";
 
-  // Convert 'address' to Kudu's Sockaddr
-  DCHECK(IsResolvedAddress(address));
+  // Convert 'address_' to Kudu's Sockaddr
   Sockaddr sockaddr;
   if (FLAGS_rpc_use_loopback) {
     // Listen on all addresses, including loopback.
-    sockaddr.set_port(address.port);
+    sockaddr.set_port(address_.port);
     DCHECK(sockaddr.IsWildcard()) << sockaddr.ToString();
   } else {
     // Only listen on the canonical address for KRPC.
-    RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr));
+    RETURN_IF_ERROR(TNetworkAddressToSockaddr(address_, &sockaddr));
   }
 
   // Call the messenger to create an AcceptorPool for us.
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index e88579f..a141dba 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -102,16 +102,16 @@ class RpcMgr {
  public:
   RpcMgr(bool use_tls = false) : use_tls_(use_tls) {}
 
-  /// Initializes the reactor threads, and prepares for sending outbound RPC requests.
-  Status Init() WARN_UNUSED_RESULT;
+  /// Initializes the reactor threads, and prepares for sending outbound RPC requests. All
+  /// services will be started on 'address', which must be a resolved IP address.
+  Status Init(const TNetworkAddress& address) WARN_UNUSED_RESULT;
 
   bool is_inited() const { return messenger_.get() != nullptr; }
 
-  /// Start the acceptor threads which listen on 'address', making KRPC services
-  /// available. 'address' has to be a resolved IP address. Before this method is called,
-  /// remote clients will get a 'connection refused' error when trying to invoke an RPC
-  /// on this host.
-  Status StartServices(const TNetworkAddress& address) WARN_UNUSED_RESULT;
+  /// Start the acceptor threads which listen on 'address_', making KRPC services
+  /// available. Before this method is called, remote clients will get a 'connection
+  /// refused' error when trying to invoke an RPC on this host.
+  Status StartServices() WARN_UNUSED_RESULT;
 
   /// Register a new service.
   ///
@@ -223,6 +223,9 @@ class RpcMgr {
   /// True if TLS is configured for communication between Impala backends. messenger_ will
   /// be configured to use TLS if this is set.
   const bool use_tls_;
+
+  /// The host/port the rpc services are run on.
+  TNetworkAddress address_;
 };
 
 } // namespace impala
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
index 92279fc..fee4d39 100644
--- a/be/src/runtime/backend-client.h
+++ b/be/src/runtime/backend-client.h
@@ -19,7 +19,6 @@
 #define IMPALA_BACKEND_CLIENT_H
 
 #include "runtime/client-cache.h"
-#include "testutil/fault-injection-util.h"
 #include "util/runtime-profile-counters.h"
 
 #include "gen-cpp/ImpalaInternalService.h"
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 5d39c56..3121ee9 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -492,12 +492,12 @@ class DataStreamTest : public testing::Test {
   void StartKrpcBackend() {
     RpcMgr* rpc_mgr = exec_env_->rpc_mgr();
     KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->stream_mgr();
-    ASSERT_OK(rpc_mgr->Init());
+    ASSERT_OK(rpc_mgr->Init(krpc_address_));
     test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr,
         exec_env_->process_mem_tracker()));
     ASSERT_OK(test_service_->Init());
     ASSERT_OK(krpc_stream_mgr->Init(test_service_->mem_tracker()));
-    ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
+    ASSERT_OK(rpc_mgr->StartServices());
   }
 
   void StopKrpcBackend() {
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index e80a2a7..22049b1 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -355,7 +355,7 @@ Status ExecEnv::Init() {
   // Initializes the RPCMgr, ControlServices and DataStreamServices.
   // Initialization needs to happen in the following order due to dependencies:
   // - RPC manager, DataStreamService and DataStreamManager.
-  RETURN_IF_ERROR(rpc_mgr_->Init());
+  RETURN_IF_ERROR(rpc_mgr_->Init(krpc_address_));
   control_svc_.reset(new ControlService(rpc_metrics_));
   RETURN_IF_ERROR(control_svc_->Init());
   data_svc_.reset(new DataStreamService(rpc_metrics_));
@@ -508,7 +508,7 @@ Status ExecEnv::StartStatestoreSubscriberService() {
 
 Status ExecEnv::StartKrpcService() {
   LOG(INFO) << "Starting KRPC service";
-  RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_));
+  RETURN_IF_ERROR(rpc_mgr_->StartServices());
   return Status::OK();
 }
 
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index dad65e1..6e95174 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -79,7 +79,7 @@ Status TestEnv::Init() {
   IpAddr ip_address;
   RETURN_IF_ERROR(HostnameToIpAddr(FLAGS_hostname, &ip_address));
   exec_env_->krpc_address_.__set_hostname(ip_address);
-  RETURN_IF_ERROR(exec_env_->rpc_mgr_->Init());
+  RETURN_IF_ERROR(exec_env_->rpc_mgr_->Init(exec_env_->krpc_address_));
   exec_env_->control_svc_.reset(new ControlService(exec_env_->rpc_metrics_));
   RETURN_IF_ERROR(exec_env_->control_svc_->Init());
 
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 08f20c2..0ba51bc 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -31,7 +31,6 @@
 #include "runtime/query-state.h"
 #include "service/client-request-state.h"
 #include "service/impala-server.h"
-#include "testutil/fault-injection-util.h"
 #include "util/debug-util.h"
 #include "util/memory-metrics.h"
 #include "util/parse-util.h"
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index 890ceec..42b14f9 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -32,7 +32,6 @@
 #include "runtime/row-batch.h"
 #include "util/memory-metrics.h"
 #include "util/parse-util.h"
-#include "testutil/fault-injection-util.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
 #include "gen-cpp/data_stream_service.proxy.h"
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index ccd00a8..22a8243 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -25,7 +25,6 @@
 #include "runtime/query-state.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/exec-env.h"
-#include "testutil/fault-injection-util.h"
 
 #include "common/names.h"
 
diff --git a/be/src/testutil/CMakeLists.txt b/be/src/testutil/CMakeLists.txt
index 828cf6a..17885fa 100644
--- a/be/src/testutil/CMakeLists.txt
+++ b/be/src/testutil/CMakeLists.txt
@@ -26,7 +26,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/testutil")
 
 add_library(TestUtil
   desc-tbl-builder.cc
-  fault-injection-util.cc
   impalad-query-executor.cc
   in-process-servers.cc
   mini-kdc-wrapper.cc
diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc
deleted file mode 100644
index 48d3a9e..0000000
--- a/be/src/testutil/fault-injection-util.cc
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef NDEBUG
-
-#include "testutil/fault-injection-util.h"
-
-#include <random>
-
-#include <thrift/transport/TSSLSocket.h>
-#include <thrift/transport/TTransportException.h>
-
-#include "common/atomic.h"
-
-#include "common/names.h"
-
-DECLARE_int32(fault_injection_rpc_exception_type);
-
-namespace impala {
-
-using apache::thrift::transport::TTransportException;
-using apache::thrift::transport::TSSLException;
-
-void FaultInjectionUtil::InjectRpcException(bool is_send, int freq) {
-  static AtomicInt32 send_count(-1);
-  static AtomicInt32 recv_count(-1);
-  int32_t xcp_type = FLAGS_fault_injection_rpc_exception_type;
-  if (xcp_type == RPC_EXCEPTION_NONE) return;
-
-  // We currently support injecting exception at some RPCs only.
-  if (is_send) {
-    if (send_count.Add(1) % freq == 0) {
-      switch (xcp_type) {
-        case RPC_EXCEPTION_SEND_CLOSED_CONNECTION:
-          throw TTransportException(TTransportException::NOT_OPEN,
-              "Called write on non-open socket");
-        case RPC_EXCEPTION_SEND_TIMEDOUT:
-          throw TTransportException(TTransportException::TIMED_OUT,
-              "send timeout expired");
-        case RPC_EXCEPTION_SSL_SEND_CLOSED_CONNECTION:
-          throw TTransportException(TTransportException::NOT_OPEN);
-        case RPC_EXCEPTION_SSL_SEND_TIMEDOUT:
-          throw TSSLException("SSL_write: Resource temporarily unavailable");
-        // Simulate half-opened connections.
-        case RPC_EXCEPTION_SEND_STALE_CONNECTION:
-          throw TTransportException(TTransportException::END_OF_FILE,
-             "No more data to read.");
-        case RPC_EXCEPTION_SSL_SEND_STALE_CONNECTION:
-          throw TSSLException("SSL_read: Connection reset by peer");
-        // fall through for the default case.
-      }
-    }
-  } else {
-    if (recv_count.Add(1) % freq == 0) {
-      switch (xcp_type) {
-        case RPC_EXCEPTION_RECV_CLOSED_CONNECTION:
-          throw TTransportException(TTransportException::NOT_OPEN,
-              "Called read on non-open socket");
-        case RPC_EXCEPTION_RECV_TIMEDOUT:
-          throw TTransportException(TTransportException::TIMED_OUT,
-              "EAGAIN (timed out)");
-        case RPC_EXCEPTION_SSL_RECV_CLOSED_CONNECTION:
-          throw TTransportException(TTransportException::NOT_OPEN);
-        case RPC_EXCEPTION_SSL_RECV_TIMEDOUT:
-          throw TSSLException("SSL_read: Resource temporarily unavailable");
-        // fall through for the default case.
-      }
-    }
-  }
-}
-
-}
-
-#endif
diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h
deleted file mode 100644
index 0ab77d0..0000000
--- a/be/src/testutil/fault-injection-util.h
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_TESTUTIL_FAULT_INJECTION_UTIL_H
-#define IMPALA_TESTUTIL_FAULT_INJECTION_UTIL_H
-
-#include "util/time.h"
-
-namespace impala {
-
-#ifndef NDEBUG
-
-class FaultInjectionUtil {
- public:
-
-  enum RpcExceptionType {
-    RPC_EXCEPTION_NONE = 0,
-    RPC_EXCEPTION_SEND_CLOSED_CONNECTION,
-    RPC_EXCEPTION_SEND_STALE_CONNECTION,
-    RPC_EXCEPTION_SEND_TIMEDOUT,
-    RPC_EXCEPTION_RECV_CLOSED_CONNECTION,
-    RPC_EXCEPTION_RECV_TIMEDOUT,
-    RPC_EXCEPTION_SSL_SEND_CLOSED_CONNECTION,
-    RPC_EXCEPTION_SSL_SEND_STALE_CONNECTION,
-    RPC_EXCEPTION_SSL_SEND_TIMEDOUT,
-    RPC_EXCEPTION_SSL_RECV_CLOSED_CONNECTION,
-    RPC_EXCEPTION_SSL_RECV_TIMEDOUT,
-  };
-
-  /// Test util function that injects exceptions to RPC client functions.
-  /// 'is_send' indicates whether injected fault is at the send() or recv() of an RPC.
-  /// The exception specified in 'FLAGS_fault_injection_rpc_exception_type' is injected
-  /// on every 'freq' invocations of this function.
-  static void InjectRpcException(bool is_send, int freq);
-
-};
-
-#define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq)                 \
-    FaultInjectionUtil::InjectRpcException(true, freq)
-#define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq)                 \
-    FaultInjectionUtil::InjectRpcException(false, freq)
-
-#else // NDEBUG
-
-#define FAULT_INJECTION_SEND_RPC_EXCEPTION(freq)
-#define FAULT_INJECTION_RECV_RPC_EXCEPTION(freq)
-
-#endif
-
-}
-#endif // IMPALA_TESTUTIL_FAULT_INJECTION_UTIL_H
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index 1bad6fc..822098e 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -26,10 +26,13 @@
 #include "common/version.h"
 #include "runtime/collection-value.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/tuple-row.h"
 #include "runtime/row-batch.h"
 #include "util/cpu-info.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/string-parser.h"
 #include "util/uid-util.h"
 #include "util/time.h"
@@ -329,63 +332,82 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) {
   return true;
 }
 
-Status DebugActionImpl(const string& debug_action, const char* label) {
+Status DebugActionImpl(
+    const string& debug_action, const char* label, const std::vector<string>& args) {
   const DebugActionTokens& action_list = TokenizeDebugActions(debug_action);
   static const char ERROR_MSG[] = "Invalid debug_action $0:$1 ($2)";
   for (const vector<string>& components : action_list) {
-    // size() != 2 check filters out ExecNode debug actions.
-    if (components.size() != 2 || !iequals(components[0], label)) continue;
+    // 'components' should be of the form {label, arg_0, ..., arg_n, action}
+    if (components.size() != 2 + args.size() || !iequals(components[0], label)) {
+      continue;
+    }
+    // Check if the arguments match.
+    bool matches = true;
+    for (int i = 0; i < args.size(); ++i) {
+      if (!iequals(components[i + 1], args[i])) {
+        matches = false;
+        break;
+      }
+    }
+    if (!matches) continue;
+    const string& action_str = components[args.size() + 1];
     // 'tokens' becomes {command, param0, param1, ... }
-    vector<string> tokens = TokenizeDebugActionParams(components[1]);
+    vector<string> tokens = TokenizeDebugActionParams(action_str);
     DCHECK_GE(tokens.size(), 1);
     const string& cmd = tokens[0];
     int sleep_millis = 0;
     if (iequals(cmd, "SLEEP")) {
       // SLEEP@<millis>
       if (tokens.size() != 2) {
-        return Status(Substitute(ERROR_MSG, components[0], components[1],
-                "expected SLEEP@<ms>"));
+        return Status(
+            Substitute(ERROR_MSG, components[0], action_str, "expected SLEEP@<ms>"));
       }
       sleep_millis = atoi(tokens[1].c_str());
     } else if (iequals(cmd, "JITTER")) {
       // JITTER@<millis>[@<probability>}
       if (tokens.size() < 2 || tokens.size() > 3) {
-        return Status(Substitute(ERROR_MSG, components[0], components[1],
-                "expected JITTER@<ms>[@<probability>]"));
+        return Status(Substitute(ERROR_MSG, components[0], action_str,
+            "expected JITTER@<ms>[@<probability>]"));
       }
       int max_millis = atoi(tokens[1].c_str());
       if (tokens.size() == 3) {
         bool should_execute = true;
         if (!ParseProbability(tokens[2], &should_execute)) {
-          return Status(Substitute(ERROR_MSG, components[0], components[1],
-                  "invalid probability"));
+          return Status(
+              Substitute(ERROR_MSG, components[0], action_str, "invalid probability"));
         }
         if (!should_execute) continue;
       }
       sleep_millis = rand() % (max_millis + 1);
     } else if (iequals(cmd, "FAIL")) {
-      // FAIL[@<probability>]
-      if (tokens.size() > 2) {
-        return Status(Substitute(ERROR_MSG, components[0], components[1],
-                "expected FAIL[@<probability>]"));
+      // FAIL[@<probability>][@<error message>]
+      if (tokens.size() > 3) {
+        return Status(Substitute(ERROR_MSG, components[0], action_str,
+            "expected FAIL[@<probability>][@<error message>]"));
       }
-      if (tokens.size() == 2) {
+      if (tokens.size() >= 2) {
         bool should_execute = true;
         if (!ParseProbability(tokens[1], &should_execute)) {
-          return Status(Substitute(ERROR_MSG, components[0], components[1],
-                  "invalid probability"));
+          return Status(
+              Substitute(ERROR_MSG, components[0], action_str, "invalid probability"));
         }
         if (!should_execute) continue;
       }
-      return Status(TErrorCode::INTERNAL_ERROR, Substitute("Debug Action: $0:$1",
-              components[0], components[1]));
+      string error_msg = tokens.size() == 3 ?
+          tokens[2] :
+          Substitute("Debug Action: $0:$1", components[0], action_str);
+
+      if (ImpaladMetrics::DEBUG_ACTION_NUM_FAIL != nullptr) {
+        ImpaladMetrics::DEBUG_ACTION_NUM_FAIL->Increment(1l);
+      }
+      return Status(TErrorCode::INTERNAL_ERROR, error_msg);
     } else {
-      return Status(Substitute(ERROR_MSG, components[0], components[1],
-              "invalid command"));
+      DCHECK(false) << "Invalid debug action";
+      return Status(Substitute(ERROR_MSG, components[0], action_str, "invalid command"));
     }
     if (sleep_millis > 0) {
-      VLOG(1) << Substitute("Debug Action: $0:$1 sleeping for $2 ms",
-          components[0], components[1], sleep_millis);
+      VLOG(1) << Substitute("Debug Action: $0:$1 sleeping for $2 ms", components[0],
+          action_str, sleep_millis);
       SleepForMs(sleep_millis);
     }
   }
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index f88495b..d0fcad8 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -142,16 +142,18 @@ DebugActionTokens TokenizeDebugActions(const string& debug_actions);
 std::vector<std::string> TokenizeDebugActionParams(const string& action);
 
 /// Slow path implementing DebugAction() for the case where 'debug_action' is non-empty.
-Status DebugActionImpl(const string& debug_action, const char* label) WARN_UNUSED_RESULT;
+Status DebugActionImpl(const string& debug_action, const char* label,
+    const std::vector<string>& args) WARN_UNUSED_RESULT;
 
 /// If debug_action query option has a "global action" (i.e. not exec-node specific)
-/// and matches the given 'label', apply the the action. See ImpalaService.thrift for
-/// details of the format and available global actions. For ExecNode code, use
-/// ExecNode::ExecDebugAction() instead.
-WARN_UNUSED_RESULT static inline Status DebugAction(
-    const string& debug_action, const char* label) {
+/// and matches the given 'label' and 'args', apply the the action. See
+/// ImpalaService.thrift for details of the format and available global actions. For
+/// ExecNode code, use ExecNode::ExecDebugAction() instead. Will return OK unless either
+/// an invalid debug action is specified or the FAIL action is executed.
+WARN_UNUSED_RESULT static inline Status DebugAction(const string& debug_action,
+    const char* label, const std::vector<string>& args = std::vector<string>()) {
   if (LIKELY(debug_action.empty())) return Status::OK();
-  return DebugActionImpl(debug_action, label);
+  return DebugActionImpl(debug_action, label, args);
 }
 
 WARN_UNUSED_RESULT static inline Status DebugAction(
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index d190564..7238284 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -24,6 +24,7 @@
 
 #include "common/names.h"
 
+DECLARE_string(debug_actions);
 DECLARE_bool(use_local_catalog);
 
 namespace impala {
@@ -135,6 +136,7 @@ const char* ImpaladMetricKeys::HEDGED_READ_OPS =
     "impala-server.hedged-read-ops";
 const char* ImpaladMetricKeys::HEDGED_READ_OPS_WIN =
     "impala-server.hedged-read-ops-win";
+const char* ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL = "impala.debug_action.fail";
 
 // These are created by impala-server during startup.
 // =======
@@ -168,6 +170,7 @@ IntCounter* ImpaladMetrics::CATALOG_CACHE_LOAD_SUCCESS_COUNT = nullptr;
 IntCounter* ImpaladMetrics::CATALOG_CACHE_MISS_COUNT = nullptr;
 IntCounter* ImpaladMetrics::CATALOG_CACHE_REQUEST_COUNT = nullptr;
 IntCounter* ImpaladMetrics::CATALOG_CACHE_TOTAL_LOAD_TIME = nullptr;
+IntCounter* ImpaladMetrics::DEBUG_ACTION_NUM_FAIL = nullptr;
 
 // Gauges
 IntGauge* ImpaladMetrics::CATALOG_NUM_DBS = nullptr;
@@ -354,6 +357,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
   HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
   HEDGED_READ_OPS_WIN = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS_WIN, 0);
 
+  if (!FLAGS_debug_actions.empty()) {
+    DEBUG_ACTION_NUM_FAIL = m->AddCounter(ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL, 0);
+  }
 }
 
 }
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index fafbb5f..7f20ccc 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -209,6 +209,10 @@ class ImpaladMetricKeys {
   /// (i.e. returned faster than original read).
   static const char* HEDGED_READ_OPS_WIN;
 
+  /// Total number of times the FAIL debug action is hit. The counter is only created if
+  /// --debug_actions is set.
+  static const char* DEBUG_ACTION_NUM_FAIL;
+
 };
 
 /// Global impalad-wide metrics.  This is useful for objects that want to update metrics
@@ -251,6 +255,7 @@ class ImpaladMetrics {
   static IntCounter* CATALOG_CACHE_MISS_COUNT;
   static IntCounter* CATALOG_CACHE_REQUEST_COUNT;
   static IntCounter* CATALOG_CACHE_TOTAL_LOAD_TIME;
+  static IntCounter* DEBUG_ACTION_NUM_FAIL;
 
   // Gauges
   static IntGauge* CATALOG_NUM_DBS;
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 33d50ba..3a733c5 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -89,15 +89,28 @@ enum TImpalaQueryOptions {
   //  invalid, the option is ignored.
   //
   // 2. Global actions
-  //  "<global label>:<command>@<param0>@<param1>@...<paramN>",
-  //  global labels are marked in the code with DEBUG_ACTION*() macros.
+  //  "<global label>:<arg0>:...:<argN>:<command>@<param0>@<param1>@...<paramN>",
+  //  Used with the DebugAction() call, the action will be performed if the label and
+  //  optional arguments all match. The arguments can be used to make the debug action
+  //  context dependent, for example to only fail rpcs when a particular hostname matches.
+  //  Note that some debug actions must be specified as a query option while others must
+  //  be passed in with the startup flag.
   //  Available global actions:
   //  - SLEEP@<ms> will sleep for the 'ms' milliseconds.
   //  - JITTER@<ms>[@<probability>] will sleep for a random amount of time between 0
   //    and 'ms' milliseconds with the given probability. If <probability> is omitted,
   //    it is 1.0.
-  //  - FAIL[@<probability>] returns an INTERNAL_ERROR status with the given
-  //    probability. If <probability> is omitted, it is 1.0.
+  //  - FAIL[@<probability>][@<error>] returns an INTERNAL_ERROR status with the given
+  //    probability and error. If <probability> is omitted, it is 1.0. If 'error' is
+  //    omitted, a generic error of the form: 'Debug Action: <label>:<action>' is used.
+  //    The code executing the debug action may respond to different error messages by
+  //    exercising different error paths.
+  // Examples:
+  // - CRS_BEFORE_ADMISSION:SLEEP@1000
+  //   Causes a 1 second sleep before queries are submitted to the admission controller.
+  // - IMPALA_SERVICE_POOL:127.0.0.1:27002:TransmitData:FAIL@0.1@REJECT_TOO_BUSY
+  //   Causes TransmitData rpcs to the third minicluster impalad to fail 10% of the time
+  //   with a "service too busy" error.
   //
   // Only a single ExecNode action is allowed, but multiple global actions can be
   // specified. To specify multiple actions, separate them with "|".
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index fc883a8..de1a8e1 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2618,5 +2618,15 @@
     "units": "NONE",
     "kind": "COUNTER",
     "key": "impala.webserver.total-cookie-auth-failure"
+  },
+  {
+    "description": "The number of times the FAIL debug action returned an error. For testing only.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "FAIL debug action hits",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.debug_action.fail"
   }
 ]
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 694c631..bbed25e 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -457,7 +457,7 @@ class ImpaladProcess(BaseImpalaProcess):
     super(ImpaladProcess, self).__init__(cmd, container_id, port_map)
     self.service = ImpaladService(self.hostname, self.webserver_interface,
         self.get_webserver_port(), self.__get_beeswax_port(), self.__get_be_port(),
-        self.__get_hs2_port(), self.__get_hs2_http_port(),
+        self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(),
         self._get_webserver_certificate_file())
 
   def _get_default_webserver_port(self):
@@ -469,6 +469,9 @@ class ImpaladProcess(BaseImpalaProcess):
   def __get_be_port(self):
     return int(self._get_port('be_port', DEFAULT_BE_PORT))
 
+  def __get_krpc_port(self):
+    return int(self._get_port('krpc_port', DEFAULT_KRPC_PORT))
+
   def __get_hs2_port(self):
     return int(self._get_port('hs2_port', DEFAULT_HS2_PORT))
 
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index fb6beb2..9cb7f50 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -162,12 +162,13 @@ class BaseImpalaService(object):
 # new connections or accessing the debug webpage.
 class ImpaladService(BaseImpalaService):
   def __init__(self, hostname, webserver_interface="", webserver_port=25000,
-      beeswax_port=21000, be_port=22000, hs2_port=21050, hs2_http_port=28000,
-      webserver_certificate_file=""):
+      beeswax_port=21000, be_port=22000, krpc_port=27000, hs2_port=21050,
+      hs2_http_port=28000, webserver_certificate_file=""):
     super(ImpaladService, self).__init__(
         hostname, webserver_interface, webserver_port, webserver_certificate_file)
     self.beeswax_port = beeswax_port
     self.be_port = be_port
+    self.krpc_port = krpc_port
     self.hs2_port = hs2_port
     self.hs2_http_port = hs2_http_port
 
diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py
index fc60c1a..07310b4 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -24,6 +24,16 @@ from tests.common.skip import SkipIf, SkipIfBuildType
 class TestRPCException(CustomClusterTestSuite):
   """Tests Impala exception handling in TransmitData() RPC to make sure no
      duplicated row batches are sent. """
+  # DataStreamService rpc names
+  TRANSMIT_DATA_RPC = "TransmitData"
+  END_DATA_STREAM_RPC = "EndDataStream"
+
+  # Error to specify for ImpalaServicePool to reject rpcs with a 'server too busy' error.
+  REJECT_TOO_BUSY_MSG = "REJECT_TOO_BUSY"
+
+  # The BE krpc port of the impalad these tests simulate rpc errors at.
+  KRPC_PORT = 27002
+
   # This query ends up calling TransmitData() more than 2048 times to ensure
   # proper test coverage.
   TEST_QUERY = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
@@ -40,55 +50,74 @@ class TestRPCException(CustomClusterTestSuite):
       pytest.skip('runs only in exhaustive')
     super(TestRPCException, cls).setup_class()
 
-  # Execute TEST_QUERY. If 'exception_string' is None, it's expected to complete
-  # sucessfully with result matching EXPECTED_RESULT. Otherwise, it's expected
-  # to fail with 'exception_string'.
-  def execute_test_query(self, exception_string):
-    try:
-      result = self.client.execute(self.TEST_QUERY)
-      assert result.data == self.EXPECTED_RESULT
-      assert not exception_string
-    except ImpalaBeeswaxException as e:
-      if exception_string is None:
-        raise e
-      assert exception_string in str(e)
+  def _get_num_fails(self, impalad):
+    num_fails = impalad.service.get_metric_value("impala.debug_action.fail")
+    if num_fails is None:
+      return 0
+    return num_fails
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=1")
-  def test_rpc_send_closed_connection(self, vector):
-    self.execute_test_query(None)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=2")
-  def test_rpc_send_stale_connection(self, vector):
-    self.execute_test_query(None)
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=3")
-  def test_rpc_send_timed_out(self, vector):
-    self.execute_test_query(None)
+  # Execute TEST_QUERY repeatedly until the FAIL debug action has been hit. If
+  # 'exception_string' is None, it's expected to always complete sucessfully with result
+  # matching EXPECTED_RESULT. Otherwise, it's expected to fail with 'exception_string' if
+  # the debug action has been hit.
+  def execute_test_query(self, exception_string):
+    impalad = self.cluster.impalads[2]
+    assert impalad.service.krpc_port == self.KRPC_PORT
+    # Re-run the query until the metrics show that we hit the debug action or we've run 10
+    # times. Each test in this file has at least a 50% chance of hitting the action per
+    # run, so there's at most a (1/2)^10 chance that this loop will fail spuriously.
+    i = 0
+    while self._get_num_fails(impalad) == 0 and i < 10:
+      i += 1
+      try:
+        result = self.client.execute(self.TEST_QUERY)
+        assert result.data == self.EXPECTED_RESULT
+        assert not exception_string or self._get_num_fails(impalad) == 0
+      except ImpalaBeeswaxException as e:
+        if exception_string is None:
+          raise e
+        assert exception_string in str(e)
+    assert self._get_num_fails(impalad) > 0
 
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=5")
-  def test_rpc_recv_timed_out(self, vector):
-    self.execute_test_query(None)
+  def _get_fail_action(rpc, error=None, port=KRPC_PORT, p=0.1):
+    """Returns a debug action that causes rpcs with the name 'rpc' that are sent to the
+    impalad at 'port' to FAIL with probability 'p' and return 'error' if specified."""
+    debug_action = "IMPALA_SERVICE_POOL:127.0.0.1:{port}:{rpc}:FAIL@{probability}" \
+        .format(rpc=rpc, probability=p, port=port)
+    if error is not None:
+      debug_action += "@" + error
+    return debug_action
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=6")
-  def test_rpc_secure_send_closed_connection(self, vector):
+  @CustomClusterTestSuite.with_args("--debug_actions=" +
+      _get_fail_action(rpc=TRANSMIT_DATA_RPC, error=REJECT_TOO_BUSY_MSG))
+  def test_transmit_data_retry(self):
+    """Run a query where TransmitData may fail with a "server too busy" error. We should
+    always retry in this case, so the query should always eventually succeed."""
     self.execute_test_query(None)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=7")
-  def test_rpc_secure_send_stale_connection(self, vector):
-    self.execute_test_query(None)
+  @CustomClusterTestSuite.with_args("--debug_actions=" +
+      _get_fail_action(rpc=TRANSMIT_DATA_RPC, error=REJECT_TOO_BUSY_MSG) +
+      "|" + _get_fail_action(rpc=TRANSMIT_DATA_RPC))
+  def test_transmit_data_error(self):
+    """Run a query where TransmitData may fail with a "server too busy" or with a generic
+    error. The query should either succeed or fail with the given error."""
+    self.execute_test_query("Debug Action: IMPALA_SERVICE_POOL:FAIL@0.1")
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=8")
-  def test_rpc_secure_send_timed_out(self, vector):
+  @CustomClusterTestSuite.with_args("--debug_actions=" +
+      _get_fail_action(rpc=END_DATA_STREAM_RPC, error=REJECT_TOO_BUSY_MSG, p=0.5))
+  def test_end_data_stream_retry(self):
+    """Run a query where EndDataStream may fail with a "server too busy" error. We should
+    always retry in this case, so the query should always eventually succeed."""
     self.execute_test_query(None)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--fault_injection_rpc_exception_type=10")
-  def test_rpc_secure_recv_timed_out(self, vector):
-    self.execute_test_query(None)
+  @CustomClusterTestSuite.with_args("--debug_actions=" +
+      _get_fail_action(rpc=END_DATA_STREAM_RPC, error=REJECT_TOO_BUSY_MSG, p=0.5) +
+      "|" + _get_fail_action(rpc=END_DATA_STREAM_RPC, p=0.5))
+  def test_end_data_stream_error(self):
+    """Run a query where EndDataStream may fail with a "server too busy" or with a generic
+    error. The query should either succeed or fail with the given error."""
+    self.execute_test_query("Debug Action: IMPALA_SERVICE_POOL:FAIL@0.5")