You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/18 20:29:28 UTC

incubator-impala git commit: IMPALA-3575: Add retry to backend connection request and rpc timeout

Repository: incubator-impala
Updated Branches:
  refs/heads/master 343bdad86 -> 2ab130aa0


IMPALA-3575: Add retry to backend connection request and rpc timeout

This patch adds a configurable timeout for all backend client
RPC to avoid query hang issue.

Prior to this change, Impala doesn't set socket send/recv timeout for
backend client. RPC will wait forever for data. In extreme cases
of bad network or destination host has kernel panic, sender will not
get response and RPC will hang. Query hang is hard to detect. If
hang happens at ExecRemoteFragment() or CancelPlanFragments(), query
cannot be canelled unless you restart coordinator.

Added send/recv timeout to all RPCs to avoid query hang. For catalog
client, keep default timeout to 0 (no timeout) because ExecDdl()
could take very long time if table has many partitons, mainly waiting
for HMS API call.

Added a wrapper RetryRpcRecv() to wait for receiver response for
longer time. This is needed by certain RPCs. For example, TransmitData()
by DataStreamSender, receiver could hold response to add back pressure.

If an RPC fails, the connection is left in an unrecoverable state.
we don't put the underlying connection back to cache but close it. This
is to make sure broken connection won't cause more RPC failure.

Added retry for CancelPlanFragment RPC. This reduces the chance that cancel
request gets lost due to unstable network, but this can cause cancellation
takes longer time. and make test_lifecycle.py more flaky.
The metric num-fragments-in-flight might not be 0 yet due to previous tests.
Modified the test to check the metric delta instead of comparing to 0 to
reduce flakyness. However, this might not capture some failures.

Besides the new EE test, I used the following iptables rule to
inject network failure to verify RPCs never hang.
1. Block network traffic on a port completely
  iptables -A INPUT -p tcp -m tcp --dport 22002 -j DROP
2. Randomly drop 5% of TCP packets to slowdown network
  iptables -A INPUT -p tcp -m tcp --dport 22000 -m statistic --mode random --probability 0.05 -j DROP

Change-Id: Id6723cfe58df6217f4a9cdd12facd320cbc24964
Reviewed-on: http://gerrit.cloudera.org:8080/3343
Reviewed-by: Juan Yu <jy...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2ab130aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2ab130aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2ab130aa

Branch: refs/heads/master
Commit: 2ab130aa0a0d8b974a618899d162cd1d1fbd765d
Parents: 343bdad
Author: Juan Yu <jy...@cloudera.com>
Authored: Thu Jun 9 11:51:22 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon Jul 18 13:29:24 2016 -0700

----------------------------------------------------------------------
 be/src/common/global-flags.cc            |   5 ++
 be/src/rpc/thrift-util.cc                |   4 +-
 be/src/rpc/thrift-util.h                 |   4 +-
 be/src/runtime/client-cache.cc           |  20 +++++
 be/src/runtime/client-cache.h            |  88 +++++++++++++++---
 be/src/runtime/coordinator.cc            |  10 ++-
 be/src/runtime/data-stream-sender.cc     |  35 +++++---
 be/src/runtime/exec-env.cc               |  31 ++++++-
 be/src/service/fragment-exec-state.cc    |   6 +-
 be/src/service/impala-internal-service.h |   9 +-
 be/src/statestore/statestore.cc          |  22 +++--
 be/src/testutil/fault-injection-util.h   |  59 ++++++++++++
 be/src/util/error-util-test.cc           |  16 ++--
 common/thrift/generate_error_codes.py    |   2 +-
 tests/custom_cluster/test_rpc_timeout.py | 125 ++++++++++++++++++++++++++
 tests/query_test/test_lifecycle.py       |  15 ++--
 tests/verifiers/metric_verifier.py       |  10 ++-
 17 files changed, 397 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 296b338..869ba53 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -98,6 +98,11 @@ DEFINE_int32(stress_datastream_recvr_delay_ms, 0, "A stress option that causes d
 DEFINE_bool(skip_file_runtime_filtering, false, "Skips file-based runtime filtering in "
     "order to provide a regression test for IMPALA-3798. Effective in debug builds "
     "only.");
+DEFINE_int32(fault_injection_rpc_delay_ms, 0, "A fault injection option that causes "
+    "rpc server handling to be delayed to trigger an RPC timeout on the caller side. "
+    "Effective in debug builds only.");
+DEFINE_int32(fault_injection_rpc_type, 0, "A fault injection option that specifies "
+    "which rpc call will be injected with the delay. Effective in debug builds only.");
 #endif
 
 DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/rpc/thrift-util.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index 4677237..3a5cff3 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -171,8 +171,8 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress&
   return false;
 }
 
-bool IsTimeoutTException(const TException& e) {
-  // String taken from Thrift's TSocket.cpp
+bool IsRecvTimeoutTException(const TException& e) {
+  // String taken from Thrift's TSocket.cpp, this only happens in TSocket::read()
   return strstr(e.what(), "EAGAIN (timed out)") != NULL;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/rpc/thrift-util.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h
index 40420c3..873674f 100644
--- a/be/src/rpc/thrift-util.h
+++ b/be/src/rpc/thrift-util.h
@@ -151,8 +151,8 @@ std::ostream& operator<<(std::ostream& out, const TColumnValue& colval);
 /// string representation
 bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& b);
 
-/// Returns true if the TException corresponds to a TCP socket recv or send timeout.
-bool IsTimeoutTException(const apache::thrift::TException& e);
+/// Returns true if the TException corresponds to a TCP socket recv timeout.
+bool IsRecvTimeoutTException(const apache::thrift::TException& e);
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/runtime/client-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 73ecc44..e244120 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -143,6 +143,26 @@ void ClientCacheHelper::ReleaseClient(ClientKey* client_key) {
   *client_key = NULL;
 }
 
+void ClientCacheHelper::DestroyClient(ClientKey* client_key) {
+  DCHECK(*client_key != NULL) << "Trying to destroy NULL client";
+  shared_ptr<ThriftClientImpl> client_impl;
+  ClientMap::iterator client;
+  {
+    lock_guard<mutex> lock(client_map_lock_);
+    client = client_map_.find(*client_key);
+    DCHECK(client != client_map_.end());
+    client_impl = client->second;
+  }
+  VLOG(1) << "Broken Connection, destroy client for " << client_impl->address();
+
+  client_impl->Close();
+  if (metrics_enabled_) total_clients_metric_->Increment(-1);
+  if (metrics_enabled_) clients_in_use_metric_->Increment(-1);
+  lock_guard<mutex> lock(client_map_lock_);
+  client_map_.erase(client);
+  *client_key = NULL;
+}
+
 void ClientCacheHelper::CloseConnections(const TNetworkAddress& address) {
   PerHostCache* cache;
   {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/runtime/client-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 0028167..53dd1f9 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -21,6 +21,7 @@
 #include <boost/unordered_map.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/bind.hpp>
+#include <gutil/strings/substitute.h>
 
 #include "runtime/client-cache-types.h"
 #include "util/metrics.h"
@@ -92,6 +93,10 @@ class ClientCacheHelper {
   /// next use they will have to be reopened via ReopenClient().
   void CloseConnections(const TNetworkAddress& address);
 
+  /// Close the client connection and don't put client back to per-host cache.
+  /// Also remove client from client_map_.
+  void DestroyClient(ClientKey* client_key);
+
   /// Return a debug representation of the contents of this cache.
   std::string DebugString();
 
@@ -190,14 +195,19 @@ template<class T>
 class ClientConnection {
  public:
   ClientConnection(ClientCache<T>* client_cache, TNetworkAddress address, Status* status)
-    : client_cache_(client_cache), client_(NULL) {
+    : client_cache_(client_cache), client_(NULL), address_(address),
+      client_is_unrecoverable_(false) {
     *status = client_cache_->GetClient(address, &client_);
     if (status->ok()) DCHECK(client_ != NULL);
   }
 
   ~ClientConnection() {
     if (client_ != NULL) {
-      client_cache_->ReleaseClient(&client_);
+      if (client_is_unrecoverable_) {
+        client_cache_->DestroyClient(&client_);
+      } else {
+        client_cache_->ReleaseClient(&client_);
+      }
     }
   }
 
@@ -213,40 +223,90 @@ class ClientConnection {
   /// depending on the error received from the first attempt.
   /// TODO: Detect already-closed cnxns and only retry in that case.
   ///
-  /// Returns RPC_TIMEOUT if a timeout occurred, RPC_CLIENT_CONNECT_FAILURE if the client
-  /// failed to connect, and RPC_GENERAL_ERROR if the RPC could not be completed for any
-  /// other reason (except for an unexpectedly closed cnxn, see TODO). Application-level
-  /// failures should be signalled through the response type.
-  //
+  /// retry_is_safe is an output parameter. In case of connection failure,
+  /// '*retry_is_safe' is set to true because the send never occurred and it's
+  /// safe to retry the RPC. Otherwise, it's set to false to indicate that the RPC was
+  /// in progress when it failed or the RPC was completed, therefore retrying the RPC
+  /// is not safe.
+  ///
+  /// Returns RPC_RECV_TIMEOUT if a timeout occurred while waiting for a response,
+  /// RPC_CLIENT_CONNECT_FAILURE if the client failed to connect, and RPC_GENERAL_ERROR
+  /// if the RPC could not be completed for any other reason (except for an unexpectedly
+  /// closed cnxn, see TODO).
+  /// Application-level failures should be signalled through the response type.
+  ///
   /// TODO: Use TTransportException::TTransportExceptionType to distinguish between
   /// failure modes.
   template <class F, class Request, class Response>
-  Status DoRpc(const F& f, const Request& request, Response* response) {
+  Status DoRpc(const F& f, const Request& request, Response* response,
+      bool* retry_is_safe = NULL) {
     DCHECK(response != NULL);
+    client_is_unrecoverable_ = true;
+    if (retry_is_safe != NULL) *retry_is_safe = false;
     try {
       (client_->*f)(*response, request);
+    } catch (const apache::thrift::TApplicationException& e) {
+      // TApplicationException only happens in recv RPC call.
+      // which means send RPC call is done, should not retry.
+      return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
     } catch (const apache::thrift::TException& e) {
-      if (IsTimeoutTException(e)) return Status(TErrorCode::RPC_TIMEOUT);
+      if (IsRecvTimeoutTException(e)) {
+        return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
+            "Client $0 timed-out during recv call.", TNetworkAddressToString(address_)));
+      }
+      VLOG(1) << "client " << client_ << " unexpected exception: "
+              << e.what() << ", type=" << typeid(e).name();
 
       // Client may have unexpectedly been closed, so re-open and retry.
       // TODO: ThriftClient should return proper error codes.
       const Status& status = Reopen();
       if (!status.ok()) {
+        if (retry_is_safe != NULL) *retry_is_safe = true;
         return Status(TErrorCode::RPC_CLIENT_CONNECT_FAILURE, status.GetDetail());
       }
       try {
         (client_->*f)(*response, request);
       } catch (apache::thrift::TException& e) {
         // By this point the RPC really has failed.
+        // TODO: Revisit this logic later. It's possible that the new connection
+        // works but we hit timeout here.
         return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
       }
     }
+    client_is_unrecoverable_ = false;
+    return Status::OK();
+  }
+
+  /// In certain cases, the server may take longer to provide an RPC response than
+  /// the configured socket timeout. Callers may wish to retry receiving the response.
+  /// This is safe if and only if DoRpc() returned RPC_RECV_TIMEOUT.
+  template <class F, class Response>
+  Status RetryRpcRecv(const F& recv_func, Response* response) {
+    DCHECK(response != NULL);
+    DCHECK(client_is_unrecoverable_);
+    try {
+      (client_->*recv_func)(*response);
+    } catch (const apache::thrift::TException& e) {
+      if (IsRecvTimeoutTException(e)) {
+        return Status(TErrorCode::RPC_RECV_TIMEOUT, strings::Substitute(
+            "Client $0 timed-out during recv call.", TNetworkAddressToString(address_)));
+      }
+      // If it's not timeout exception, then the connection is broken, stop retrying.
+      return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
+    }
+    client_is_unrecoverable_ = false;
     return Status::OK();
   }
 
  private:
   ClientCache<T>* client_cache_;
   T* client_;
+  TNetworkAddress address_;
+
+  /// Indicate the last rpc call sent by this connection succeeds or not. If the rpc call
+  /// fails for any reason, the connection could be left in a bad state and cannot be
+  /// recovered.
+  bool client_is_unrecoverable_;
 };
 
 /// Generic cache of Thrift clients for a given service type.
@@ -332,6 +392,12 @@ class ClientCache {
     return client_cache_helper_.ReleaseClient(reinterpret_cast<ClientKey*>(client));
   }
 
+  /// Destroy the client because it's left in an unrecoverable state after errors
+  /// in DoRpc() to avoid more rpc failure.
+  void DestroyClient(T** client) {
+    return client_cache_helper_.DestroyClient(reinterpret_cast<ClientKey*>(client));
+  }
+
   /// Factory method to produce a new ThriftClient<T> for the wrapped cache
   ThriftClientImpl* MakeClient(const TNetworkAddress& address, ClientKey* client_key,
       const std::string service_name, bool enable_ssl) {
@@ -343,10 +409,6 @@ class ClientCache {
 
 };
 
-/// Common cache / connection types
-class CatalogServiceClient;
-typedef ClientCache<CatalogServiceClient> CatalogServiceClientCache;
-typedef ClientConnection<CatalogServiceClient> CatalogServiceConnection;
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6f7368c..6484d8c 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1486,8 +1486,14 @@ void Coordinator::CancelRemoteFragments() {
     VLOG_QUERY << "sending CancelPlanFragment rpc for instance_id="
                << exec_state->fragment_instance_id() << " backend="
                << exec_state->impalad_address();
-    Status rpc_status = backend_client.DoRpc(
-        &ImpalaBackendClient::CancelPlanFragment, params, &res);
+    Status rpc_status;
+    // Try to send the RPC 3 times before failing.
+    bool retry_is_safe;
+    for (int i = 0; i < 3; ++i) {
+      rpc_status = backend_client.DoRpc(&ImpalaBackendClient::CancelPlanFragment,
+          params, &res, &retry_is_safe);
+      if (rpc_status.ok() || !retry_is_safe) break;
+    }
     if (!rpc_status.ok()) {
       exec_state->status()->MergeStatus(rpc_status);
       stringstream msg;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index 61e8b85..890b97c 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -66,7 +66,6 @@ class DataStreamSender::Channel {
           PlanNodeId dest_node_id, int buffer_size)
     : parent_(parent),
       buffer_size_(buffer_size),
-      client_cache_(NULL),
       row_desc_(row_desc),
       address_(MakeNetworkAddress(destination.hostname, destination.port)),
       fragment_instance_id_(fragment_instance_id),
@@ -111,8 +110,6 @@ class DataStreamSender::Channel {
   DataStreamSender* parent_;
   int buffer_size_;
 
-  ImpalaBackendClientCache* client_cache_;
-
   const RowDescriptor& row_desc_;
   TNetworkAddress address_;
   TUniqueId fragment_instance_id_;
@@ -137,20 +134,25 @@ class DataStreamSender::Channel {
   bool rpc_in_flight_;  // true if the rpc_thread_ is busy sending.
 
   Status rpc_status_;  // status of most recently finished TransmitData rpc
+  RuntimeState* runtime_state_;
 
   // Serialize batch_ into thrift_batch_ and send via SendBatch().
   // Returns SendBatch() status.
   Status SendCurrentBatch();
 
-  // Synchronously call TransmitData() on a client from client_cache_ and update
-  // rpc_status_ based on return value (or set to error if RPC failed).
+  // Synchronously call TransmitData() on a client from impalad_client_cache and
+  // update rpc_status_ based on return value (or set to error if RPC failed).
   // Called from a thread from the rpc_thread_ pool.
   void TransmitData(int thread_id, const TRowBatch*);
   void TransmitDataHelper(const TRowBatch*);
+
+  // Send RPC and retry waiting for response if get RPC timeout error.
+  Status DoTransmitDataRpc(ImpalaBackendConnection* client,
+      const TTransmitDataParams& params, TTransmitDataResult* res);
 };
 
 Status DataStreamSender::Channel::Init(RuntimeState* state) {
-  client_cache_ = state->impalad_client_cache();
+  runtime_state_ = state;
   // TODO: figure out how to size batch_
   int capacity = max(1, buffer_size_ / max(row_desc_.GetRowSize(), 1));
   batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker_.get()));
@@ -197,13 +199,13 @@ void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
   params.__set_eos(false);
   params.__set_sender_id(parent_->sender_id_);
 
-  ImpalaBackendConnection client(client_cache_, address_, &rpc_status_);
+  ImpalaBackendConnection client(runtime_state_->impalad_client_cache(),
+      address_, &rpc_status_);
   if (!rpc_status_.ok()) return;
 
   TTransmitDataResult res;
   client->SetTransmitDataCounter(parent_->thrift_transmit_timer_);
-  rpc_status_ =
-      client.DoRpc(&ImpalaBackendClient::TransmitData, params, &res);
+  rpc_status_ = DoTransmitDataRpc(&client, params, &res);
   client->ResetTransmitDataCounter();
   if (!rpc_status_.ok()) return;
   COUNTER_ADD(parent_->profile_->total_time_counter(),
@@ -218,6 +220,16 @@ void DataStreamSender::Channel::TransmitDataHelper(const TRowBatch* batch) {
   }
 }
 
+Status DataStreamSender::Channel::DoTransmitDataRpc(ImpalaBackendConnection* client,
+    const TTransmitDataParams& params, TTransmitDataResult* res) {
+  Status status = client->DoRpc(&ImpalaBackendClient::TransmitData, params, res);
+  while (status.code() == TErrorCode::RPC_RECV_TIMEOUT &&
+      !runtime_state_->is_cancelled()) {
+    status = client->RetryRpcRecv(&ImpalaBackendClient::recv_TransmitData, res);
+  }
+  return status;
+}
+
 void DataStreamSender::Channel::WaitForRpc() {
   SCOPED_TIMER(parent_->state_->total_network_send_timer());
   unique_lock<mutex> l(rpc_thread_lock_);
@@ -279,7 +291,8 @@ Status DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
   RETURN_IF_ERROR(GetSendStatus());
 
   Status client_cnxn_status;
-  ImpalaBackendConnection client(client_cache_, address_, &client_cnxn_status);
+  ImpalaBackendConnection client(runtime_state_->impalad_client_cache(),
+      address_, &client_cnxn_status);
   RETURN_IF_ERROR(client_cnxn_status);
 
   TTransmitDataParams params;
@@ -291,7 +304,7 @@ Status DataStreamSender::Channel::FlushAndSendEos(RuntimeState* state) {
   TTransmitDataResult res;
 
   VLOG_RPC << "calling TransmitData(eos=true) to terminate channel.";
-  rpc_status_ = client.DoRpc(&ImpalaBackendClient::TransmitData, params, &res);
+  rpc_status_ = DoTransmitDataRpc(&client, params, &res);
   if (!rpc_status_.ok()) {
     stringstream msg;
     msg << "TransmitData(eos=true) to " << address_ << " failed:\n" << rpc_status_.msg().msg();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0260db0..39b4f28 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -130,6 +130,18 @@ DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads availabl
 
 DECLARE_string(ssl_client_ca_certificate);
 
+DEFINE_int32(backend_client_connection_num_retries, 3, "Retry backend connections.");
+// When network is unstable, TCP will retry and sending could take longer time.
+// Choose 5 minutes as default timeout because we don't want RPC timeout be triggered
+// by intermittent network issue. The timeout should not be too long either, otherwise
+// query could hang for a while before it's cancelled.
+DEFINE_int32(backend_client_rpc_timeout_ms, 300000, "(Advanced) The underlying "
+    "TSocket send/recv timeout in milliseconds for a backend client RPC. ");
+
+DEFINE_int32(catalog_client_connection_num_retries, 3, "Retry catalog connections.");
+DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying TSocket "
+    "send/recv timeout in milliseconds for a catalog client RPC.");
+
 // The key for a variable set in Impala's test environment only, to allow the
 // resource-broker to correctly map node addresses into a form that Llama understand.
 const static string PSEUDO_DISTRIBUTED_CONFIG_KEY =
@@ -145,10 +157,14 @@ ExecEnv::ExecEnv()
   : metrics_(new MetricGroup("impala-metrics")),
     stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
-        new ImpalaBackendClientCache(
+        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries,
+            0, FLAGS_backend_client_rpc_timeout_ms,
+            FLAGS_backend_client_rpc_timeout_ms,
             "", !FLAGS_ssl_client_ca_certificate.empty())),
     catalogd_client_cache_(
-        new CatalogServiceClientCache(
+        new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries,
+            0, FLAGS_catalog_client_rpc_timeout_ms,
+            FLAGS_catalog_client_rpc_timeout_ms,
             "", !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new DiskIoMgr()),
@@ -195,14 +211,21 @@ ExecEnv::ExecEnv()
   if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
 }
 
+// TODO: Need refactor to get rid of duplicated code.
 ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
                  int webserver_port, const string& statestore_host, int statestore_port)
   : metrics_(new MetricGroup("impala-metrics")),
     stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
-        new ImpalaBackendClientCache("", !FLAGS_ssl_client_ca_certificate.empty())),
+        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries,
+            0, FLAGS_backend_client_rpc_timeout_ms,
+            FLAGS_backend_client_rpc_timeout_ms,
+            "", !FLAGS_ssl_client_ca_certificate.empty())),
     catalogd_client_cache_(
-        new CatalogServiceClientCache("", !FLAGS_ssl_client_ca_certificate.empty())),
+        new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries,
+            0, FLAGS_catalog_client_rpc_timeout_ms,
+            FLAGS_catalog_client_rpc_timeout_ms,
+            "", !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver(webserver_port)),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index e80cd78..39bbbaa 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -113,18 +113,20 @@ void FragmentMgr::FragmentExecState::ReportStatusCb(
 
   TReportExecStatusResult res;
   Status rpc_status;
+  bool retry_is_safe;
   // Try to send the RPC 3 times before failing.
   for (int i = 0; i < 3; ++i) {
-    rpc_status = coord.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res);
+    rpc_status = coord.DoRpc(&ImpalaBackendClient::ReportExecStatus, params, &res,
+        &retry_is_safe);
     if (rpc_status.ok()) {
       rpc_status = Status(res.status);
       break;
     }
+    if (!retry_is_safe) break;
     if (i < 2) SleepForMs(100);
   }
   if (!rpc_status.ok()) {
     UpdateStatus(rpc_status);
-    // TODO: Do we really need to cancel?
     executor_.Cancel();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index d7bfca9..c4f20ab 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -21,6 +21,7 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "service/impala-server.h"
 #include "service/fragment-mgr.h"
+#include "testutil/fault-injection-util.h"
 
 namespace impala {
 
@@ -34,31 +35,37 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
 
   virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val,
       const TExecPlanFragmentParams& params) {
+    FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT);
     fragment_mgr_->ExecPlanFragment(params).SetTStatus(&return_val);
   }
 
   virtual void CancelPlanFragment(TCancelPlanFragmentResult& return_val,
       const TCancelPlanFragmentParams& params) {
+    FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT);
     fragment_mgr_->CancelPlanFragment(return_val, params);
   }
 
   virtual void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params) {
+    FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS);
     impala_server_->ReportExecStatus(return_val, params);
   }
 
   virtual void TransmitData(TTransmitDataResult& return_val,
       const TTransmitDataParams& params) {
+    FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
     impala_server_->TransmitData(return_val, params);
   }
 
   virtual void UpdateFilter(TUpdateFilterResult& return_val,
       const TUpdateFilterParams& params) {
-      impala_server_->UpdateFilter(return_val, params);
+    FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
+    impala_server_->UpdateFilter(return_val, params);
   }
 
   virtual void PublishFilter(TPublishFilterResult& return_val,
       const TPublishFilterParams& params) {
+    FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER);
     fragment_mgr_->PublishFilter(return_val, params);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index aa095c3..0830626 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -654,24 +654,22 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
   Status status;
   if (is_heartbeat) {
     status = SendHeartbeat(subscriber.get());
-    if (status.code() == TErrorCode::RPC_TIMEOUT) {
-      // Rewrite status to make it more useful, while preserving the stack
-      status.SetErrorMsg(ErrorMsg(TErrorCode::RPC_TIMEOUT, Substitute(
-          "Subscriber $0 ($1) timed-out during heartbeat RPC. Timeout is $2s.",
-          subscriber->id(), lexical_cast<string>(subscriber->network_address()),
-          FLAGS_statestore_heartbeat_tcp_timeout_seconds)));
+    if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
+      // Add details to status to make it more useful, while preserving the stack
+      status.AddDetail(Substitute(
+          "Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.",
+          subscriber->id(), FLAGS_statestore_heartbeat_tcp_timeout_seconds));
     }
 
     deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
   } else {
     bool update_skipped;
     status = SendTopicUpdate(subscriber.get(), &update_skipped);
-    if (status.code() == TErrorCode::RPC_TIMEOUT) {
-      // Rewrite status to make it more useful, while preserving the stack
-      status.SetErrorMsg(ErrorMsg(TErrorCode::RPC_TIMEOUT, Substitute(
-          "Subscriber $0 ($1) timed-out during topic-update RPC. Timeout is $2s.",
-          subscriber->id(), lexical_cast<string>(subscriber->network_address()),
-          FLAGS_statestore_update_tcp_timeout_seconds)));
+    if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
+      // Add details to status to make it more useful, while preserving the stack
+      status.AddDetail(Substitute(
+          "Subscriber $0 timed-out during topic-update RPC. Timeout is $1s.",
+          subscriber->id(), FLAGS_statestore_update_tcp_timeout_seconds));
     }
     // If the subscriber responded that it skipped the last update sent, we assume that
     // it was busy doing something else, and back off slightly before sending another.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/testutil/fault-injection-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.h b/be/src/testutil/fault-injection-util.h
new file mode 100644
index 0000000..2fa5090
--- /dev/null
+++ b/be/src/testutil/fault-injection-util.h
@@ -0,0 +1,59 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_TESTUTIL_FAULT_INJECTION_UTIL_H
+#define IMPALA_TESTUTIL_FAULT_INJECTION_UTIL_H
+
+#include "util/time.h"
+
+#ifndef NDEBUG
+  DECLARE_int32(fault_injection_rpc_delay_ms);
+  DECLARE_int32(fault_injection_rpc_type);
+#endif
+
+namespace impala {
+
+#ifndef NDEBUG
+  enum RpcCallType {
+    RPC_NULL = 0,
+    RPC_EXECPLANFRAGMENT,
+    RPC_CANCELPLANFRAGMENT,
+    RPC_PUBLISHFILTER,
+    RPC_UPDATEFILTER,
+    RPC_TRANSMITDATA,
+    RPC_REPORTEXECSTATUS,
+    RPC_RANDOM    // This must be last.
+  };
+
+  /// Test util function that can inject delay to specified RPC server handling
+  /// function so that RPC caller could hit the RPC recv timeout condition.
+  /// my_type specifies which RPC type the current function is.
+  /// rpc_type specifies which RPC function the delay should be enabled.
+  /// delay_ms specifies how long the delay should be.
+  static void InjectRpcDelay(RpcCallType my_type, int32_t rpc_type, int32_t delay_ms) {
+    std::random_device rd;
+    srand(rd());
+    if (delay_ms == 0) return;
+    if (rpc_type == RPC_RANDOM) rpc_type = rand() % RPC_RANDOM;
+    if (rpc_type == my_type) SleepForMs(delay_ms);
+  }
+
+  #define FAULT_INJECTION_RPC_DELAY(type) InjectRpcDelay(type, \
+      FLAGS_fault_injection_rpc_type, FLAGS_fault_injection_rpc_delay_ms)
+#else
+  #define FAULT_INJECTION_RPC_DELAY(type)
+#endif
+
+}
+#endif // IMPALA_TESTUTIL_FAULT_INJECTION_UTIL_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/be/src/util/error-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/error-util-test.cc b/be/src/util/error-util-test.cc
index d6827db..6dd47ac 100644
--- a/be/src/util/error-util-test.cc
+++ b/be/src/util/error-util-test.cc
@@ -65,18 +65,18 @@ TEST(ErrorMsg, MergeMap) {
   ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
   ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size());
   cleared[TErrorCode::GENERAL].messages.push_back("1");
-  cleared[TErrorCode::RPC_TIMEOUT].messages.push_back("p");
+  cleared[TErrorCode::RPC_RECV_TIMEOUT].messages.push_back("p");
   ClearErrorMap(cleared);
   ASSERT_EQ(2, cleared.size());
-  ASSERT_EQ(1, cleared.count(TErrorCode::RPC_TIMEOUT));
+  ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT));
 
   MergeErrorMaps(&dummy, cleared);
   ASSERT_EQ(3, dummy.size());
   ASSERT_EQ(3, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
   ASSERT_EQ(1, dummy[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
-  ASSERT_EQ(1, dummy.count(TErrorCode::RPC_TIMEOUT));
-  ASSERT_EQ(0, dummy[TErrorCode::RPC_TIMEOUT].count);
-  ASSERT_EQ(0, dummy[TErrorCode::RPC_TIMEOUT].messages.size());
+  ASSERT_EQ(1, dummy.count(TErrorCode::RPC_RECV_TIMEOUT));
+  ASSERT_EQ(0, dummy[TErrorCode::RPC_RECV_TIMEOUT].count);
+  ASSERT_EQ(0, dummy[TErrorCode::RPC_RECV_TIMEOUT].messages.size());
   ASSERT_EQ(0, dummy[TErrorCode::GENERAL].count);
   ASSERT_EQ(1, dummy[TErrorCode::GENERAL].messages.size());
 
@@ -84,9 +84,9 @@ TEST(ErrorMsg, MergeMap) {
   ASSERT_EQ(3, cleared.size());
   ASSERT_EQ(3, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].count);
   ASSERT_EQ(1, cleared[TErrorCode::PARQUET_MULTIPLE_BLOCKS].messages.size());
-  ASSERT_EQ(1, cleared.count(TErrorCode::RPC_TIMEOUT));
-  ASSERT_EQ(0, cleared[TErrorCode::RPC_TIMEOUT].count);
-  ASSERT_EQ(0, cleared[TErrorCode::RPC_TIMEOUT].messages.size());
+  ASSERT_EQ(1, cleared.count(TErrorCode::RPC_RECV_TIMEOUT));
+  ASSERT_EQ(0, cleared[TErrorCode::RPC_RECV_TIMEOUT].count);
+  ASSERT_EQ(0, cleared[TErrorCode::RPC_RECV_TIMEOUT].messages.size());
   ASSERT_EQ(0, cleared[TErrorCode::GENERAL].count);
   ASSERT_EQ(1, cleared[TErrorCode::GENERAL].messages.size());
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 26ea78d..0fcd44f 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -114,7 +114,7 @@ error_codes = (
    "Verify that all your impalads are the same version."),
 
   ("RPC_GENERAL_ERROR", 31, "RPC Error: $0"),
-  ("RPC_TIMEOUT", 32, "RPC timed out"),
+  ("RPC_RECV_TIMEOUT", 32, "RPC recv timed out: $0"),
 
   ("UDF_VERIFY_FAILED", 33,
    "Failed to verify function $0 from LLVM module $1, see log for more details."),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/tests/custom_cluster/test_rpc_timeout.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
new file mode 100644
index 0000000..2bcf9e8
--- /dev/null
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -0,0 +1,125 @@
+# Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import ImpalaCluster
+from tests.common.skip import SkipIfBuildType
+from tests.verifiers.metric_verifier import MetricVerifier
+
+@SkipIfBuildType.not_dev_build
+class TestRPCTimeout(CustomClusterTestSuite):
+  """Tests for every Impala RPC timeout handling, query should not hang and
+     resource should be all released."""
+  TEST_QUERY = "select count(c2.string_col) from \
+     functional.alltypestiny join functional.alltypessmall c2"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestRPCTimeout, cls).setup_class()
+
+  def execute_query_verify_metrics(self, query, repeat = 1):
+    for i in range(repeat):
+      try:
+        self.client.execute(query)
+      except ImpalaBeeswaxException:
+        pass
+    verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
+
+    for v in verifiers:
+      v.wait_for_metric("impala-server.num-fragments-in-flight", 0)
+      v.verify_num_unused_buffers()
+
+  def execute_query_then_cancel(self, query, vector, repeat = 1):
+    for _ in range(repeat):
+      handle = self.execute_query_async(query, vector.get_value('exec_option'))
+      self.client.fetch(query, handle)
+      try:
+        self.client.cancel(handle)
+      except ImpalaBeeswaxException:
+        pass
+      finally:
+        self.client.close_query(handle)
+    verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
+
+    for v in verifiers:
+      v.wait_for_metric("impala-server.num-fragments-in-flight", 0)
+      v.verify_num_unused_buffers()
+
+  def execute_runtime_filter_query(self):
+    query = "select STRAIGHT_JOIN * from functional_avro.alltypes a join \
+            [SHUFFLE] functional_avro.alltypes b on a.month = b.id \
+            and b.int_col = -3"
+    self.client.execute("SET RUNTIME_FILTER_MODE=GLOBAL")
+    self.client.execute("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
+    self.client.execute("SET MAX_SCAN_RANGE_LENGTH=1024")
+    self.execute_query_verify_metrics(query)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=1")
+  def test_execplanfragment_timeout(self, vector):
+    for i in range(3):
+      ex= self.execute_query_expect_failure(self.client, self.TEST_QUERY)
+      assert "RPC recv timed out" in str(ex)
+    verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
+
+    for v in verifiers:
+      v.wait_for_metric("impala-server.num-fragments-in-flight", 0)
+      v.verify_num_unused_buffers()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=2")
+  def test_cancelplanfragment_timeout(self, vector):
+    query = "select * from tpch.lineitem limit 5000"
+    self.execute_query_then_cancel(query, vector)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=3")
+  def test_publishfilter_timeout(self, vector):
+    self.execute_runtime_filter_query()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=4")
+  def test_updatefilter_timeout(self, vector):
+    self.execute_runtime_filter_query()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=5")
+  def test_transmitdata_timeout(self, vector):
+    self.execute_query_verify_metrics(self.TEST_QUERY)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=6"
+      " --status_report_interval=1 ")
+  def test_reportexecstatus_timeout(self, vector):
+    self.execute_query_verify_metrics(self.TEST_QUERY)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=7")
+  def test_random_rpc_timeout(self, vector):
+    self.execute_query_verify_metrics(self.TEST_QUERY, 10)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/tests/query_test/test_lifecycle.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_lifecycle.py b/tests/query_test/test_lifecycle.py
index e8f01d3..6721dc1 100644
--- a/tests/query_test/test_lifecycle.py
+++ b/tests/query_test/test_lifecycle.py
@@ -21,7 +21,11 @@ from tests.verifiers.metric_verifier import MetricVerifier
 class TestFragmentLifecycle(ImpalaTestSuite):
   """Using the debug action interface, check that failed queries correctly clean up *all*
   fragments"""
+  # TODO: The metric num-fragments-in-flight might not be 0 yet due
+  # to previous tests. Check the value change instead of 0 to reduce
+  # flaky. However, this might not capture some failures.
 
+  IN_FLIGHT_FRAGMENTS = "impala-server.num-fragments-in-flight"
   @classmethod
   def get_workload(self):
     return 'functional'
@@ -29,22 +33,24 @@ class TestFragmentLifecycle(ImpalaTestSuite):
   @pytest.mark.execute_serially
   def test_failure_in_prepare(self):
     # Fail the scan node
+    verifiers = [ MetricVerifier(i.service, [self.IN_FLIGHT_FRAGMENTS])
+        for i in ImpalaCluster().impalads ]
     self.client.execute("SET DEBUG_ACTION='-1:0:PREPARE:FAIL'");
     try:
       self.client.execute("SELECT COUNT(*) FROM functional.alltypes")
       assert "Query should have thrown an error"
     except ImpalaBeeswaxException:
       pass
-    verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
 
     for v in verifiers:
-      v.wait_for_metric("impala-server.num-fragments-in-flight", 0)
+      v.wait_for_metric_reset(self.IN_FLIGHT_FRAGMENTS)
 
   @pytest.mark.execute_serially
   def test_failure_in_prepare_multi_fragment(self):
     # Test that if one fragment fails that the others are cleaned up during the ensuing
     # cancellation.
-
+    verifiers = [ MetricVerifier(i.service, [self.IN_FLIGHT_FRAGMENTS])
+        for i in ImpalaCluster().impalads ]
     # Fail the scan node
     self.client.execute("SET DEBUG_ACTION='-1:0:PREPARE:FAIL'");
 
@@ -56,10 +62,9 @@ class TestFragmentLifecycle(ImpalaTestSuite):
     except ImpalaBeeswaxException:
       pass
 
-    verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
     for v in verifiers:
       # Long timeout required because fragments may be blocked while sending data, default
       # timeout is 60s before they wake up and cancel themselves.
       #
       # TODO: Fix when we have cancellable RPCs.
-      v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=65)
+      v.wait_for_metric_reset(self.IN_FLIGHT_FRAGMENTS, timeout=65)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2ab130aa/tests/verifiers/metric_verifier.py
----------------------------------------------------------------------
diff --git a/tests/verifiers/metric_verifier.py b/tests/verifiers/metric_verifier.py
index bcde3f5..be6fe53 100644
--- a/tests/verifiers/metric_verifier.py
+++ b/tests/verifiers/metric_verifier.py
@@ -31,9 +31,12 @@ METRIC_LIST = [
 
 class MetricVerifier(object):
   """Reuseable class that can verify common metrics"""
-  def __init__(self, impalad_service):
+  def __init__(self, impalad_service, metrics_list=[]):
     """Initialize module given an ImpalaService object"""
     self.impalad_service = impalad_service
+    self.initial_metrics = {}
+    for metric in metrics_list:
+      self.initial_metrics[metric] = self.impalad_service.get_metric_value(metric)
 
   def verify_metrics_are_zero(self, timeout=60):
     """Test that all the metric in METRIC_LIST are 0"""
@@ -54,3 +57,8 @@ class MetricVerifier(object):
 
   def wait_for_metric(self, metric_name, expected_value, timeout=60):
     self.impalad_service.wait_for_metric_value(metric_name, expected_value, timeout)
+
+  def wait_for_metric_reset(self, metric_name, timeout=60):
+    if metric_name in self.initial_metrics:
+      self.impalad_service.wait_for_metric_value(metric_name,
+          self.initial_metrics[metric_name], timeout)