You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/01/07 18:04:21 UTC

[1/2] impala git commit: IMPALA-7931: fix executor shutdown races

Repository: impala
Updated Branches:
  refs/heads/master 7cc909221 -> e9652a48d


IMPALA-7931: fix executor shutdown races

There were two races:
* queries were terminated because of an impalad being detected
  as failed by the statestore even if the query had finished
  executing on that impalad.
* NUM_FRAGMENTS_IN_FLIGHT was used to detect the backend being
  idle, but it was decremented before the final status report
  was sent.

The fixes are:
* keep track of the backends that triggered the potential cancellation,
  and only proceed with the cancellation if the coordinator has fragments
  still executing on the backend.
* add a new metric that keeps track of the number of executing queries,
  which isn't decremented until the final status report is sent.

Also do some cleanup/improvements in this code:
* use proper error codes for some errors
* more overloads for Status::Expected()
* also add a metric for the total number of queries executed on the
  backend

Testing:
Add a new version of test_shutdown_executor with delays that
trigger both races. This test only runs in exhaustive to avoid
adding ~20s to core build time.

Ran exhaustive tests.

Looped test_restart_services overnight.

Change-Id: I7c1a80304cb6695d228aca8314e2231727ab1998
Reviewed-on: http://gerrit.cloudera.org:8080/12082
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a91b24cb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a91b24cb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a91b24cb

Branch: refs/heads/master
Commit: a91b24cb7962200f330c4887f38f4704a52f7c7e
Parents: 7cc9092
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Dec 13 15:37:26 2018 -0800
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Jan 5 04:14:36 2019 +0000

----------------------------------------------------------------------
 be/src/common/status.cc                       | 148 +++++++++++++------
 be/src/common/status.h                        | 108 ++++++++++----
 be/src/runtime/coordinator-backend-state.cc   |  31 ++--
 be/src/runtime/coordinator-backend-state.h    |   8 +-
 be/src/runtime/coordinator.cc                 |  17 +++
 be/src/runtime/coordinator.h                  |   5 +
 be/src/runtime/query-exec-mgr.cc              |   9 +-
 be/src/runtime/query-state.cc                 |  10 ++
 be/src/service/cancellation-work.h            | 101 +++++++++++++
 be/src/service/impala-server.cc               | 161 +++++++++++----------
 be/src/service/impala-server.h                |   6 +-
 be/src/util/impalad-metrics.cc                |  14 +-
 be/src/util/impalad-metrics.h                 |  12 ++
 common/thrift/ImpalaInternalService.thrift    |   3 +
 common/thrift/generate_error_codes.py         |  13 ++
 common/thrift/metrics.json                    |  26 +++-
 tests/custom_cluster/test_restart_services.py |  72 +++++++--
 17 files changed, 556 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/common/status.cc
----------------------------------------------------------------------
diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index 4ef4f9b..b2a617f 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -51,76 +51,72 @@ Status Status::CancelledInternal(const char* subsystem) {
   return Status(ErrorMsg::Init(TErrorCode::CANCELLED_INTERNALLY, subsystem));
 }
 
-Status::Status(TErrorCode::type code)
-    : msg_(new ErrorMsg(code)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code) : msg_(new ErrorMsg(code)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0)
-    : msg_(new ErrorMsg(code, arg0)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0)
+  : msg_(new ErrorMsg(code, arg0)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1)
-    : msg_(new ErrorMsg(code, arg0, arg1)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(
+    bool silent, TErrorCode::type code, const ArgType& arg0, const ArgType& arg1)
+  : msg_(new ErrorMsg(code, arg0, arg1)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2)
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3)
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4)
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
     const ArgType& arg5)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
     const ArgType& arg5, const ArgType& arg6)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
     const ArgType& arg5, const ArgType& arg6, const ArgType& arg7)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6,
-    arg7)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7,
-    const ArgType& arg8)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6,
-     arg7, arg8)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7, const ArgType& arg8)
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
-Status::Status(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
-    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7,
-    const ArgType& arg8, const ArgType& arg9)
-    : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6,
-    arg7, arg8, arg9)) {
-  VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
+Status::Status(bool silent, TErrorCode::type code, const ArgType& arg0,
+    const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+    const ArgType& arg5, const ArgType& arg6, const ArgType& arg7, const ArgType& arg8,
+    const ArgType& arg9)
+  : msg_(new ErrorMsg(code, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9)) {
+  if (!silent) VLOG(1) << msg_->msg() << "\n" << GetStackTrace();
 }
 
 Status::Status(const string& error_msg)
@@ -175,6 +171,62 @@ Status Status::Expected(const std::string& error_msg) {
   return Status(error_msg, true);
 }
 
+Status Status::Expected(TErrorCode::type code) {
+  return Status(true, code);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0) {
+  return Status(true, code, arg0);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1) {
+  return Status(true, code, arg0, arg1);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2) {
+  return Status(true, code, arg0, arg1, arg2);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3) {
+  return Status(true, code, arg0, arg1, arg2, arg3);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4) {
+  return Status(true, code, arg0, arg1, arg2, arg3, arg4);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5) {
+  return Status(true, code, arg0, arg1, arg2, arg3, arg4, arg5);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6) {
+  return Status(true, code, arg0, arg1, arg2, arg3, arg4, arg5, arg6);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6, const ArgType& arg7) {
+  return Status(true, code, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6, const ArgType& arg7, const ArgType& arg8) {
+  return Status(true, code, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+}
+
+Status Status::Expected(TErrorCode::type code, const ArgType& arg0, const ArgType& arg1,
+    const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+    const ArgType& arg6, const ArgType& arg7, const ArgType& arg8, const ArgType& arg9) {
+  return Status(true, code, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+}
+
 void Status::AddDetail(const std::string& msg) {
   DCHECK(msg_ != NULL);
   msg_->AddDetail(msg);

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 6c14ad4..3fd5b5b 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -78,10 +78,6 @@ class StatusPB;
 ///
 ///   return Status::OK();
 /// }
-///
-/// TODO: macros:
-/// RETURN_IF_ERROR(status) << "msg"
-/// MAKE_ERROR() << "msg"
 class NODISCARD Status {
  public:
   typedef strings::internal::SubstituteArg ArgType;
@@ -115,36 +111,46 @@ class NODISCARD Status {
 
   /// Status using only the error code as a parameter. This can be used for error messages
   /// that don't take format parameters.
-  explicit Status(TErrorCode::type code);
+  explicit Status(TErrorCode::type code) : Status(false, code) {}
 
   /// These constructors are used if the caller wants to indicate a non-successful
-  /// execution and supply a client-facing error message. This is the preferred way of
-  /// instantiating a non-successful Status.
-  Status(TErrorCode::type error, const ArgType& arg0);
-  Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1);
+  /// execution and supply a client-facing error message. Using TErrorCode and
+  /// parameterised error messages is the preferred way of instantiating a
+  /// non-successful Status instead of the std::string constructor. These constructors
+  /// log the error message and a traceback, which can be expensive, so these should only
+  /// be used for rare, low-frequency errors. Status::Expected() does not log a traceback
+  /// and should be used for higher-frequency errors.
+  Status(TErrorCode::type error, const ArgType& arg0) : Status(false, error, arg0) {}
+  Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1)
+    : Status(false, error, arg0, arg1) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2);
+      const ArgType& arg2)
+    : Status(false, error, arg0, arg1, arg2) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3);
+      const ArgType& arg2, const ArgType& arg3)
+    : Status(false, error, arg0, arg1, arg2, arg3) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4);
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4)
+    : Status(false, error, arg0, arg1, arg2, arg3, arg4) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-      const ArgType& arg5);
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5)
+    : Status(false, error, arg0, arg1, arg2, arg3, arg4, arg5) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-      const ArgType& arg5, const ArgType& arg6);
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6)
+    : Status(false, error, arg0, arg1, arg2, arg3, arg4, arg5, arg6) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-      const ArgType& arg5, const ArgType& arg6, const ArgType& arg7);
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6, const ArgType& arg7)
+    : Status(false, error, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-      const ArgType& arg5, const ArgType& arg6, const ArgType& arg7,
-      const ArgType& arg8);
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6, const ArgType& arg7, const ArgType& arg8)
+    : Status(false, error, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) {}
   Status(TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
-      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
-      const ArgType& arg5, const ArgType& arg6, const ArgType& arg7,
-      const ArgType& arg8, const ArgType& arg9);
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6, const ArgType& arg7, const ArgType& arg8, const ArgType& arg9)
+    : Status(false, error, arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9) {}
 
   /// Used when the ErrorMsg is created as an intermediate value that is either passed to
   /// the Status or to the RuntimeState.
@@ -167,9 +173,37 @@ class NODISCARD Status {
   /// Retains the TErrorCode value and the message
   explicit Status(const apache::hive::service::cli::thrift::TStatus& hs2_status);
 
-  /// Create a status instance that represents an expected error and will not be logged
+  /// The below Status::Expected() functions create a status instance that represents
+  /// an expected error. They behave the same as the constructors with the same
+  /// argument types, except they do not log the error message and traceback.
   static Status Expected(const ErrorMsg& e);
   static Status Expected(const std::string& error_msg);
+  static Status Expected(TErrorCode::type error);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0);
+  static Status Expected(
+      TErrorCode::type error, const ArgType& arg0, const ArgType& arg1);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+      const ArgType& arg5);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+      const ArgType& arg5, const ArgType& arg6);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+      const ArgType& arg5, const ArgType& arg6, const ArgType& arg7);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+      const ArgType& arg5, const ArgType& arg6, const ArgType& arg7, const ArgType& arg8);
+  static Status Expected(TErrorCode::type error, const ArgType& arg0,
+      const ArgType& arg1, const ArgType& arg2, const ArgType& arg3, const ArgType& arg4,
+      const ArgType& arg5, const ArgType& arg6, const ArgType& arg7, const ArgType& arg8,
+      const ArgType& arg9);
 
   /// same as copy c'tor
   ALWAYS_INLINE Status& operator=(const Status& status) {
@@ -271,10 +305,32 @@ class NODISCARD Status {
 
   static const char* LLVM_CLASS_NAME;
  private:
-
   // Status constructors that can suppress logging via 'silent' parameter
   Status(const ErrorMsg& error_msg, bool silent);
   Status(const std::string& error_msg, bool silent);
+  Status(bool silent, TErrorCode::type code);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6, const ArgType& arg7);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6, const ArgType& arg7, const ArgType& arg8);
+  Status(bool silent, TErrorCode::type error, const ArgType& arg0, const ArgType& arg1,
+      const ArgType& arg2, const ArgType& arg3, const ArgType& arg4, const ArgType& arg5,
+      const ArgType& arg6, const ArgType& arg7, const ArgType& arg8, const ArgType& arg9);
 
   // A non-inline function for copying status' message.
   void CopyMessageFrom(const Status& status) noexcept;

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 6a09b7b..87224c1 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -257,7 +257,6 @@ void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
 void Coordinator::BackendState::LogFirstInProgress(
     std::vector<Coordinator::BackendState*> backend_states) {
   for (Coordinator::BackendState* backend_state : backend_states) {
-    lock_guard<mutex> l(backend_state->lock_);
     if (!backend_state->IsDone()) {
       VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id())
                  << ": first in-progress backend: "
@@ -267,7 +266,14 @@ void Coordinator::BackendState::LogFirstInProgress(
   }
 }
 
-inline bool Coordinator::BackendState::IsDone() const {
+bool Coordinator::BackendState::IsDone() {
+  unique_lock<mutex> lock(lock_);
+  return IsDoneLocked(lock);
+}
+
+inline bool Coordinator::BackendState::IsDoneLocked(
+    const unique_lock<boost::mutex>& lock) const {
+  DCHECK(lock.owns_lock() && lock.mutex() == &lock_);
   return num_remaining_instances_ == 0 || !status_.ok();
 }
 
@@ -282,7 +288,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   last_report_time_ms_ = MonotonicMillis();
 
   // If this backend completed previously, don't apply the update.
-  if (IsDone()) return false;
+  if (IsDoneLocked(lock)) return false;
 
   int idx = 0;
   const bool has_profile = thrift_profiles.profile_trees.size() > 0;
@@ -358,7 +364,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   }
 
   // TODO: keep backend-wide stopwatch?
-  return IsDone();
+  return IsDoneLocked(lock);
 }
 
 void Coordinator::BackendState::UpdateExecStats(
@@ -380,13 +386,13 @@ void Coordinator::BackendState::UpdateExecStats(
 }
 
 bool Coordinator::BackendState::Cancel() {
-  lock_guard<mutex> l(lock_);
+  unique_lock<mutex> l(lock_);
 
   // Nothing to cancel if the exec rpc was not sent
   if (!rpc_sent_) return false;
 
   // don't cancel if it already finished (for any reason)
-  if (IsDone()) return false;
+  if (IsDoneLocked(l)) return false;
 
   /// If the status is not OK, we still try to cancel - !OK status might mean
   /// communication failure between backend and coordinator, but fragment
@@ -432,12 +438,9 @@ bool Coordinator::BackendState::Cancel() {
 
 void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
   DCHECK(rpc_params.dst_query_id == query_id());
-  {
-    // If the backend is already done, it's not waiting for this filter, so we skip
-    // sending it in this case.
-    lock_guard<mutex> l(lock_);
-    if (IsDone()) return;
-  }
+  // If the backend is already done, it's not waiting for this filter, so we skip
+  // sending it in this case.
+  if (IsDone()) return;
 
   if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;
   Status status;
@@ -626,10 +629,10 @@ void Coordinator::FragmentStats::AddExecStats() {
 }
 
 void Coordinator::BackendState::ToJson(Value* value, Document* document) {
-  lock_guard<mutex> l(lock_);
+  unique_lock<mutex> l(lock_);
   ResourceUtilization resource_utilization = ComputeResourceUtilizationLocked();
   value->AddMember("num_instances", fragments_.size(), document->GetAllocator());
-  value->AddMember("done", IsDone(), document->GetAllocator());
+  value->AddMember("done", IsDoneLocked(l), document->GetAllocator());
   value->AddMember("peak_per_host_mem_consumption",
       resource_utilization.peak_per_host_mem_consumption, document->GetAllocator());
   value->AddMember("bytes_read", resource_utilization.bytes_read,

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index b4697cb..1363701 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -113,6 +113,10 @@ class Coordinator::BackendState {
   Status GetStatus(bool* is_fragment_failure = nullptr,
       TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
 
+  /// Return true if execution at this backend is done. Thread-safe. Caller must not hold
+  /// lock_.
+  bool IsDone();
+
   /// Return peak memory consumption and aggregated resource usage across all fragment
   /// instances for this backend.
   ResourceUtilization ComputeResourceUtilization();
@@ -283,8 +287,8 @@ class Coordinator::BackendState {
       const FilterRoutingTable& filter_routing_table,
       TExecQueryFInstancesParams* rpc_params);
 
-  /// Return true if execution at this backend is done. Caller must hold lock_.
-  bool IsDone() const;
+  /// Version of IsDone() where caller must hold lock_ via lock;
+  bool IsDoneLocked(const boost::unique_lock<boost::mutex>& lock) const;
 
   /// Same as ComputeResourceUtilization() but caller must hold lock.
   ResourceUtilization ComputeResourceUtilizationLocked();

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7d0c0e0..6192a59 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -17,6 +17,8 @@
 
 #include "runtime/coordinator.h"
 
+#include <unordered_set>
+
 #include <thrift/protocol/TDebugProtocol.h>
 #include <boost/algorithm/string/join.hpp>
 #include <boost/filesystem.hpp>
@@ -825,6 +827,21 @@ Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization()
   return query_resource_utilization;
 }
 
+vector<TNetworkAddress> Coordinator::GetActiveBackends(
+    const vector<TNetworkAddress>& candidates) {
+  // Build set from vector so that runtime of this function is O(backend_states.size()).
+  unordered_set<TNetworkAddress> candidate_set(candidates.begin(), candidates.end());
+  vector<TNetworkAddress> result;
+  lock_guard<SpinLock> l(backend_states_init_lock_);
+  for (BackendState* backend_state : backend_states_) {
+    if (candidate_set.find(backend_state->impalad_address()) != candidate_set.end()
+        && !backend_state->IsDone()) {
+      result.push_back(backend_state->impalad_address());
+    }
+  }
+  return result;
+}
+
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   shared_lock<shared_mutex> lock(filter_lock_);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 3ab233a..9be32fa 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -199,6 +199,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// latest status reports received from those backends).
   ResourceUtilization ComputeQueryResourceUtilization();
 
+  /// Return the backends in 'candidates' that still have at least one fragment instance
+  /// executing on them. The returned backends may not be in the same order as the input.
+  std::vector<TNetworkAddress> GetActiveBackends(
+      const std::vector<TNetworkAddress>& candidates);
+
  private:
   class BackendState;
   struct FilterTarget;

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index ebd17cd..7dab8a2 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -109,7 +109,10 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
 
     auto it = map_ref->find(query_ctx.query_id);
     if (it == map_ref->end()) {
-      // register new QueryState
+      // Register new QueryState. This marks when the query first starts executing on
+      // this backend.
+      ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTED->Increment(1);
+      ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(1);
       qs = new QueryState(query_ctx, mem_limit);
       map_ref->insert(make_pair(query_ctx.query_id, qs));
       *created = true;
@@ -177,7 +180,9 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
     if (cnt > 0) return;
     map_ref->erase(it);
   }
-  // TODO: send final status report during gc, but do this from a different thread
   delete qs_from_map;
   VLOG(1) << "ReleaseQueryState(): deleted query_id=" << PrintId(query_id);
+  // BACKEND_NUM_QUERIES_EXECUTING is used to detect the backend being quiesced, so we
+  // decrement it after we're completely done with the query.
+  ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(-1);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index f706796..ed5a9f7 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -61,6 +61,9 @@ DEFINE_int32(report_status_retry_interval_ms, 100,
 DECLARE_int32(backend_client_rpc_timeout_ms);
 DECLARE_int64(rpc_max_message_size);
 
+DEFINE_int32_hidden(stress_status_report_delay_ms, 0, "Stress option to inject a delay "
+    "before status reports. Has no effect on release builds.");
+
 using namespace impala;
 
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
@@ -291,6 +294,13 @@ void QueryState::ConstructReport(bool instances_started,
 }
 
 void QueryState::ReportExecStatus() {
+#ifndef NDEBUG
+  if (FLAGS_stress_status_report_delay_ms) {
+    LOG(INFO) << "Sleeping " << FLAGS_stress_status_report_delay_ms << "ms before "
+              << "reporting for query " << PrintId(query_id());
+    SleepForMs(FLAGS_stress_status_report_delay_ms);
+  }
+#endif
   bool instances_started = fis_map_.size() > 0;
 
   // This will send a report even if we are cancelled.  If the query completed correctly

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/service/cancellation-work.h
----------------------------------------------------------------------
diff --git a/be/src/service/cancellation-work.h b/be/src/service/cancellation-work.h
new file mode 100644
index 0000000..f4a8727
--- /dev/null
+++ b/be/src/service/cancellation-work.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "gen-cpp/Types_types.h"
+
+namespace impala {
+
+/// Categorisation of causes of cancellation.
+enum class CancellationWorkCause {
+  // The Impala Server terminated the query, e.g. because it exceeded a timeout or
+  // resource limit.
+  TERMINATED_BY_SERVER,
+  // The query is being terminated because a backend failed. We can skip cancelling the
+  // query if the fragment instances running on that backend all completed.
+  BACKEND_FAILED
+};
+
+/// Work item for ImpalaServer::cancellation_thread_pool_.
+/// This class needs to support move construction and assignment for use in ThreadPool.
+class CancellationWork {
+ public:
+  // Empty constructor needed to make ThreadPool happy.
+  CancellationWork()
+    : cause_(CancellationWorkCause::TERMINATED_BY_SERVER), unregister_(false) {}
+
+  // Construct a TERMINATED_BY_SERVER CancellationWork instance.
+  static CancellationWork TerminatedByServer(
+      const TUniqueId& query_id, const Status& error, bool unregister) {
+    return CancellationWork(
+        query_id, CancellationWorkCause::TERMINATED_BY_SERVER, error, {}, unregister);
+  }
+
+  // Construct a BACKEND_FAILURE CancellationWork instance.
+  static CancellationWork BackendFailure(const TUniqueId& query_id,
+      const std::vector<TNetworkAddress>& failed_backends) {
+    return CancellationWork(query_id, CancellationWorkCause::BACKEND_FAILED, Status::OK(),
+        failed_backends, false);
+  }
+
+  const TUniqueId& query_id() const { return query_id_; }
+  CancellationWorkCause cause() const { return cause_; }
+  const Status& error() const {
+    DCHECK_ENUM_EQ(cause_, CancellationWorkCause::TERMINATED_BY_SERVER);
+    return error_;
+  }
+  const std::vector<TNetworkAddress>& failed_backends() const {
+    DCHECK_ENUM_EQ(cause_, CancellationWorkCause::BACKEND_FAILED);
+    return failed_backends_;
+  }
+  bool unregister() const { return unregister_; }
+
+ private:
+  CancellationWork(const TUniqueId& query_id, CancellationWorkCause cause,
+      const Status& error, const std::vector<TNetworkAddress>& failed_backends,
+      bool unregister)
+    : query_id_(query_id),
+      cause_(cause),
+      error_(error),
+      failed_backends_(failed_backends),
+      unregister_(unregister) {
+    DCHECK(cause_ != CancellationWorkCause::TERMINATED_BY_SERVER || !error.ok());
+    DCHECK(cause_ != CancellationWorkCause::BACKEND_FAILED || !failed_backends.empty());
+  }
+
+  // ID of query to be cancelled.
+  TUniqueId query_id_;
+
+  // Cause of the expiration.
+  CancellationWorkCause cause_;
+
+  // If 'cause_' is TERMINATED_BY_SERVER, the error containing human-readable explanation
+  // of the cancellation. Otherwise not used.
+  Status error_;
+
+  // If cause is BACKEND_FAILED, all of the backend that were detected to fail. Otherwise
+  // not used.
+  std::vector<TNetworkAddress> failed_backends_;
+
+  // If true, unregister the query after cancelling it.
+  bool unregister_;
+};
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index f003dd5..c7e52ae 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -61,6 +61,7 @@
 #include "runtime/tmp-file-mgr.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
+#include "service/cancellation-work.h"
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
 #include "service/client-request-state.h"
@@ -260,41 +261,6 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
 // Interval between checks for query expiration.
 const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
 
-// Work item for ImpalaServer::cancellation_thread_pool_.
-class CancellationWork {
- public:
-  CancellationWork(const TUniqueId& query_id, const Status& cause, bool unregister)
-      : query_id_(query_id), cause_(cause), unregister_(unregister) {
-  }
-
-  CancellationWork() {
-  }
-
-  const TUniqueId& query_id() const { return query_id_; }
-  const Status& cause() const { return cause_; }
-  bool unregister() const { return unregister_; }
-
-  bool operator<(const CancellationWork& other) const {
-    return query_id_ < other.query_id_;
-  }
-
-  bool operator==(const CancellationWork& other) const {
-    return query_id_ == other.query_id_;
-  }
-
- private:
-  // Id of query to be canceled.
-  TUniqueId query_id_;
-
-  // Error status containing a list of failed impalads causing the cancellation.
-  Status cause_;
-
-  // If true, unregister the query rather than cancelling it. Calling UnregisterQuery()
-  // does call CancelInternal eventually, but also ensures that the query is torn down and
-  // archived.
-  bool unregister_;
-};
-
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     : exec_env_(exec_env),
       thrift_serializer_(false),
@@ -1418,16 +1384,57 @@ TQueryOptions ImpalaServer::SessionState::QueryOptions() {
 
 void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
     const CancellationWork& cancellation_work) {
+  const TUniqueId& query_id = cancellation_work.query_id();
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
+  // Query was already unregistered.
+  if (request_state == nullptr) {
+    VLOG_QUERY << "CancelFromThreadPool(): query " << PrintId(query_id)
+               << " already unregistered.";
+    return;
+  }
+
+  Status error;
+  switch (cancellation_work.cause()) {
+    case CancellationWorkCause::TERMINATED_BY_SERVER:
+      error = cancellation_work.error();
+      break;
+    case CancellationWorkCause::BACKEND_FAILED: {
+      // We only want to proceed with cancellation if the backends are still in use for
+      // the query.
+      vector<TNetworkAddress> active_backends;
+      Coordinator* coord = request_state->GetCoordinator();
+      if (coord == nullptr) {
+        // Query hasn't started yet - it still will run on all backends.
+        active_backends = cancellation_work.failed_backends();
+      } else {
+        active_backends = coord->GetActiveBackends(cancellation_work.failed_backends());
+      }
+      if (active_backends.empty()) {
+        VLOG_QUERY << "CancelFromThreadPool(): all failed backends already completed for "
+                   << "query " << PrintId(query_id);
+        return;
+      }
+      stringstream msg;
+      for (int i = 0; i < active_backends.size(); ++i) {
+        msg << TNetworkAddressToString(active_backends[i]);
+        if (i + 1 != active_backends.size()) msg << ", ";
+      }
+      error = Status::Expected(TErrorCode::UNREACHABLE_IMPALADS, msg.str());
+      break;
+    }
+    default:
+      DCHECK(false) << static_cast<int>(cancellation_work.cause());
+  }
+
   if (cancellation_work.unregister()) {
-    Status status = UnregisterQuery(cancellation_work.query_id(), true,
-        &cancellation_work.cause());
+    Status status = UnregisterQuery(cancellation_work.query_id(), true, &error);
     if (!status.ok()) {
       VLOG_QUERY << "Query de-registration (" << PrintId(cancellation_work.query_id())
                  << ") failed";
     }
   } else {
-    Status status = CancelInternal(cancellation_work.query_id(), true,
-        &cancellation_work.cause());
+    VLOG_QUERY << "CancelFromThreadPool(): cancelling query_id=" << PrintId(query_id);
+    Status status = request_state->Cancel(true, &error);
     if (!status.ok()) {
       VLOG_QUERY << "Query cancellation (" << PrintId(cancellation_work.query_id())
                  << ") did not succeed: " << status.GetDetail();
@@ -1787,16 +1794,16 @@ void ImpalaServer::MembershipCallback(
       for (cancellation_entry = queries_to_cancel.begin();
           cancellation_entry != queries_to_cancel.end();
           ++cancellation_entry) {
-        stringstream cause_msg;
-        cause_msg << "Failed due to unreachable impalad(s): ";
+        stringstream backends_ss;
         for (int i = 0; i < cancellation_entry->second.size(); ++i) {
-          cause_msg << TNetworkAddressToString(cancellation_entry->second[i]);
-          if (i + 1 != cancellation_entry->second.size()) cause_msg << ", ";
+          backends_ss << TNetworkAddressToString(cancellation_entry->second[i]);
+          if (i + 1 != cancellation_entry->second.size()) backends_ss << ", ";
         }
-        string cause_str = cause_msg.str();
-        LOG(INFO) << "Query " << PrintId(cancellation_entry->first) << ": " << cause_str;
-        cancellation_thread_pool_->Offer(CancellationWork(cancellation_entry->first,
-            Status::Expected(cause_msg.str()), false));
+        VLOG_QUERY << "Backends failed for query " << PrintId(cancellation_entry->first)
+                   << ", adding to queue to check for cancellation: "
+                   << backends_ss.str();
+        cancellation_thread_pool_->Offer(CancellationWork::BackendFailure(
+            cancellation_entry->first, cancellation_entry->second));
       }
     }
   }
@@ -2048,9 +2055,10 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
               session_state.second->inflight_queries.end());
         }
         // Unregister all open queries from this session.
-        Status status = Status::Expected("Session expired due to inactivity");
-        for (const TUniqueId& query_id: inflight_queries) {
-          cancellation_thread_pool_->Offer(CancellationWork(query_id, status, true));
+        Status status = Status::Expected(TErrorCode::INACTIVE_SESSION_EXPIRED);
+        for (const TUniqueId& query_id : inflight_queries) {
+          cancellation_thread_pool_->Offer(
+              CancellationWork::TerminatedByServer(query_id, status, true));
         }
       }
     }
@@ -2113,11 +2121,10 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
           int32_t exec_time_limit_s = crs->query_options().exec_time_limit_s;
           VLOG_QUERY << "Expiring query " << PrintId(expiration_event->query_id)
                      << " due to execution time limit of " << exec_time_limit_s << "s.";
-          const string& err_msg = Substitute(
-              "Query $0 expired due to execution time limit of $1",
-              PrintId(expiration_event->query_id),
-              PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S));
-          ExpireQuery(crs.get(), Status::Expected(err_msg));
+          ExpireQuery(crs.get(),
+              Status::Expected(TErrorCode::EXEC_TIME_LIMIT_EXCEEDED,
+                  PrintId(expiration_event->query_id),
+                  PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S)));
           expiration_event = queries_by_timestamp_.erase(expiration_event);
           continue;
         }
@@ -2152,15 +2159,13 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
           }
         } else if (!crs->is_active()) {
           // Otherwise time to expire this query
-          VLOG_QUERY
-              << "Expiring query due to client inactivity: "
-              << PrintId(expiration_event->query_id) << ", last activity was at: "
-              << ToStringFromUnixMillis(crs->last_active_ms());
-          const string& err_msg = Substitute(
-              "Query $0 expired due to client inactivity (timeout is $1)",
-              PrintId(expiration_event->query_id),
-              PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
-          ExpireQuery(crs.get(), Status::Expected(err_msg));
+          VLOG_QUERY << "Expiring query due to client inactivity: "
+                     << PrintId(expiration_event->query_id) << ", last activity was at: "
+                     << ToStringFromUnixMillis(crs->last_active_ms());
+          ExpireQuery(crs.get(),
+              Status::Expected(TErrorCode::INACTIVE_QUERY_EXPIRED,
+                  PrintId(expiration_event->query_id),
+                  PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S)));
           expiration_event = queries_by_timestamp_.erase(expiration_event);
         } else {
           // Iterator is moved on in every other branch.
@@ -2187,20 +2192,19 @@ Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
   int64_t cpu_limit_s = crs->query_options().cpu_limit_s;
   int64_t cpu_limit_ns = cpu_limit_s * 1000'000'000L;
   if (cpu_limit_ns > 0 && cpu_time_ns > cpu_limit_ns) {
-    const string& err_msg = Substitute("Query $0 terminated due to CPU limit of $1",
+    Status err = Status::Expected(TErrorCode::CPU_LIMIT_EXCEEDED,
         PrintId(crs->query_id()), PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S));
-    VLOG_QUERY << err_msg;
-    return Status::Expected(err_msg);
+    VLOG_QUERY << err.msg().msg();
+    return err;
   }
 
   int64_t scan_bytes = utilization.bytes_read;
   int64_t scan_bytes_limit = crs->query_options().scan_bytes_limit;
   if (scan_bytes_limit > 0 && scan_bytes > scan_bytes_limit) {
-    const string& err_msg = Substitute(
-        "Query $0 terminated due to scan bytes limit of $1", PrintId(crs->query_id()),
-        PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES));
-    VLOG_QUERY << err_msg;
-    return Status::Expected(err_msg);
+    Status err = Status::Expected(TErrorCode::SCAN_BYTES_LIMIT_EXCEEDED,
+        PrintId(crs->query_id()), PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES));
+    VLOG_QUERY << err.msg().msg();
+    return err;
   }
   // Query is within the resource limits, check again later.
   return Status::OK();
@@ -2208,7 +2212,8 @@ Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
 
 void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
   DCHECK(!status.ok());
-  cancellation_thread_pool_->Offer(CancellationWork(crs->query_id(), status, false));
+  cancellation_thread_pool_->Offer(
+      CancellationWork::TerminatedByServer(crs->query_id(), status, false));
   ImpaladMetrics::NUM_QUERIES_EXPIRED->Increment(1L);
   crs->set_expired();
 }
@@ -2409,15 +2414,19 @@ TShutdownStatus ImpalaServer::GetShutdownStatus() const {
   result.finstances_executing =
       ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue();
   result.client_requests_registered = ImpaladMetrics::NUM_QUERIES_REGISTERED->GetValue();
+  result.backend_queries_executing =
+      ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->GetValue();
   return result;
 }
 
 string ImpalaServer::ShutdownStatusToString(const TShutdownStatus& shutdown_status) {
   return Substitute("startup grace period left: $0, deadline left: $1, "
-      "fragment instances: $2, queries registered: $3",
+      "queries registered on coordinator: $2, queries executing: $3, "
+      "fragment instances: $4",
       PrettyPrinter::Print(shutdown_status.grace_remaining_ms, TUnit::TIME_MS),
       PrettyPrinter::Print(shutdown_status.deadline_remaining_ms, TUnit::TIME_MS),
-      shutdown_status.finstances_executing, shutdown_status.client_requests_registered);
+      shutdown_status.client_requests_registered,
+      shutdown_status.backend_queries_executing, shutdown_status.finstances_executing);
 }
 
 Status ImpalaServer::StartShutdown(
@@ -2467,7 +2476,7 @@ Status ImpalaServer::StartShutdown(
     TShutdownStatus shutdown_status = GetShutdownStatus();
     LOG(INFO) << "Shutdown status: " << ShutdownStatusToString(shutdown_status);
     if (shutdown_status.grace_remaining_ms <= 0
-        && shutdown_status.finstances_executing == 0
+        && shutdown_status.backend_queries_executing == 0
         && shutdown_status.client_requests_registered == 0) {
       break;
     } else if (shutdown_status.deadline_remaining_ms <= 0) {

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index ebbf122..d9b4b05 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -121,8 +121,10 @@ class QuerySchedule;
 ///     -> Queuing delay in the admission controller (which may be unbounded).
 /// 3. The startup grace period elapses.
 /// 4. The background shutdown thread periodically checks to see if the Impala daemon is
-///    quiesced (i.e. no client requests are registered and no fragment instances are
-///    executing). If it is quiesced then it cleanly shuts down by exiting the process.
+///    quiesced (i.e. no client requests are registered and no queries are executing on
+///    the backend). If it is quiesced then it cleanly shuts down by exiting the process.
+///    The statestore will detect that the process is not responding to heartbeats and
+///    remove any entries.
 /// 5. The shutdown deadline elapses. The Impala daemon exits regardless of whether
 ///    it was successfully quiesced or not.
 ///

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 6ac9401..e4386ca 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -34,6 +34,10 @@ const char* ImpaladMetricKeys::IMPALA_SERVER_READY = "impala-server.ready";
 const char* ImpaladMetricKeys::IMPALA_SERVER_NUM_QUERIES = "impala-server.num-queries";
 const char* ImpaladMetricKeys::NUM_QUERIES_REGISTERED =
     "impala-server.num-queries-registered";
+const char* ImpaladMetricKeys::BACKEND_NUM_QUERIES_EXECUTED =
+    "impala-server.backend-num-queries-executed";
+const char* ImpaladMetricKeys::BACKEND_NUM_QUERIES_EXECUTING =
+    "impala-server.backend-num-queries-executing";
 const char* ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS =
     "impala-server.num-fragments";
 const char* ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT =
@@ -128,9 +132,11 @@ const char* ImpaladMetricKeys::HEDGED_READ_OPS_WIN =
 // =======
 // Counters
 IntGauge* ImpaladMetrics::HASH_TABLE_TOTAL_BYTES = NULL;
+IntCounter* ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTED = NULL;
+IntGauge* ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING = NULL;
+IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES = NULL;
 IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS = NULL;
 IntGauge* ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = NULL;
-IntCounter* ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES = NULL;
 IntCounter* ImpaladMetrics::NUM_QUERIES_EXPIRED = NULL;
 IntCounter* ImpaladMetrics::NUM_QUERIES_SPILLED = NULL;
 IntCounter* ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID = NULL;
@@ -249,10 +255,14 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::NUM_QUERIES_EXPIRED, 0);
   NUM_QUERIES_SPILLED = m->AddCounter(
       ImpaladMetricKeys::NUM_QUERIES_SPILLED, 0);
+  BACKEND_NUM_QUERIES_EXECUTED = m->AddCounter(
+      ImpaladMetricKeys::BACKEND_NUM_QUERIES_EXECUTED, 0);
+  BACKEND_NUM_QUERIES_EXECUTING = m->AddGauge(
+      ImpaladMetricKeys::BACKEND_NUM_QUERIES_EXECUTING, 0);
   IMPALA_SERVER_NUM_FRAGMENTS = m->AddCounter(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS, 0);
   IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT = m->AddGauge(
-      ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT, 0L);
+      ImpaladMetricKeys::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT, 0);
   IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS = m->AddGauge(
       ImpaladMetricKeys::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS, 0);
   IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS = m->AddGauge(

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/be/src/util/impalad-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index 36d9d1b..05babaa 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -46,6 +46,12 @@ class ImpaladMetricKeys {
   /// Number of fragments currently running on this server.
   static const char* IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT;
 
+  /// Number of queries that started executing on this backend.
+  static const char* BACKEND_NUM_QUERIES_EXECUTED;
+
+  /// Number of queries currently executing on this backend.
+  static const char* BACKEND_NUM_QUERIES_EXECUTING;
+
   /// Number of open HiveServer2 sessions
   static const char* IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS;
 
@@ -205,6 +211,12 @@ class ImpaladMetrics {
  public:
   // Counters
   static IntGauge* HASH_TABLE_TOTAL_BYTES;
+  static IntCounter* BACKEND_NUM_QUERIES_EXECUTED;
+  /// BACKEND_NUM_QUERIES_EXECUTING is used to determine when the backend has quiesced
+  /// and can be safely shut down without causing query failures. See IMPALA-7931 for
+  /// an example of a race that can occur if this is decremented before a query is
+  /// truly finished.
+  static IntGauge* BACKEND_NUM_QUERIES_EXECUTING;
   static IntCounter* IMPALA_SERVER_NUM_FRAGMENTS;
   static IntGauge* IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT;
   static IntCounter* IMPALA_SERVER_NUM_QUERIES;

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 821cdd8..5875257 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -747,6 +747,9 @@ struct TShutdownStatus {
   // Number of client requests still registered with the Impala server that is being shut
   // down.
   4: required i64 client_requests_registered
+
+  // Number of queries still executing on backend.
+  5: required i64 backend_queries_executing
 }
 
 struct TRemoteShutdownResult {

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index caa2ca0..19d6f44 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -380,6 +380,19 @@ error_codes = (
 
   ("THREAD_POOL_TASK_TIMED_OUT", 124,
    "$0 failed to finish before the $1 second timeout"),
+
+  ("UNREACHABLE_IMPALADS", 125, "Failed due to unreachable impalad(s): $0"),
+
+  ("INACTIVE_SESSION_EXPIRED", 126, "Session expired due to inactivity"),
+
+  ("INACTIVE_QUERY_EXPIRED", 127,
+   "Query $0 expired due to client inactivity (timeout is $1)"),
+
+  ("EXEC_TIME_LIMIT_EXCEEDED", 128, "Query $0 expired due to execution time limit of $1"),
+
+  ("CPU_LIMIT_EXCEEDED", 129, "Query $0 terminated due to CPU limit of $1"),
+
+  ("SCAN_BYTES_LIMIT_EXCEEDED", 130, "Query $0 terminated due to scan bytes limit of $1"),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index ac1d6ce..371372c 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -480,21 +480,41 @@
     "key": "impala-server.num-files-open-for-insert"
   },
   {
+    "description": "The total number of queries that executed on this backend over the life of the process.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Queries Executed On Backend",
+    "units": "UNIT",
+    "kind": "COUNTER",
+    "key": "impala-server.backend-num-queries-executed"
+  },
+  {
+    "description": "The number of queries currently executing on this backend.",
+    "contexts": [
+        "IMPALAD"
+    ],
+    "label": "Queries Executing On Backend",
+    "units": "UNIT",
+    "kind": "GAUGE",
+    "key": "impala-server.backend-num-queries-executing"
+  },
+  {
     "description": "The total number of query fragments processed over the life of the process.",
     "contexts": [
       "IMPALAD"
     ],
-    "label": "Query Fragments",
+    "label": "Query Fragment Instances",
     "units": "UNIT",
     "kind": "COUNTER",
     "key": "impala-server.num-fragments"
   },
   {
-    "description": "The number of query fragments currently executing.",
+    "description": "The number of query fragment instances currently executing.",
     "contexts": [
         "IMPALAD"
     ],
-    "label": "Query Fragments",
+    "label": "Query Fragment Instances Executing",
     "units": "UNIT",
     "kind": "GAUGE",
     "key": "impala-server.num-fragments-in-flight"

http://git-wip-us.apache.org/repos/asf/impala/blob/a91b24cb/tests/custom_cluster/test_restart_services.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index e441cbc..1b6911a 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -89,11 +89,12 @@ class TestRestart(CustomClusterTestSuite):
 
 def parse_shutdown_result(result):
   """Parse the shutdown result string and return the strings (grace left,
-  deadline left, fragment instances, queries registered)."""
+  deadline left, queries registered, queries executing)."""
   assert len(result.data) == 1
   summary = result.data[0]
   match = re.match(r'startup grace period left: ([0-9ms]*), deadline left: ([0-9ms]*), ' +
-      r'fragment instances: ([0-9]*), queries registered: ([0-9]*)', summary)
+      r'queries registered on coordinator: ([0-9]*), queries executing: ([0-9]*), ' +
+      r'fragment instances: [0-9]*', summary)
   assert match is not None, summary
   return match.groups()
 
@@ -101,6 +102,10 @@ def parse_shutdown_result(result):
 class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
   IDLE_SHUTDOWN_GRACE_PERIOD_S = 1
 
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--shutdown_grace_period_s={grace_period} \
@@ -169,8 +174,30 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
           --hostname={hostname}".format(grace_period=EXEC_SHUTDOWN_GRACE_PERIOD_S,
             deadline=EXEC_SHUTDOWN_DEADLINE_S, hostname=socket.gethostname()))
   def test_shutdown_executor(self):
-    """Test that shuts down and then restarts an executor. This should not disrupt any
-    queries that start after the shutdown or complete before the shutdown time limit."""
+    self.do_test_shutdown_executor(fetch_delay_s=0)
+
+  @pytest.mark.execute_serially
+  @SkipIfEC.scheduling
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--shutdown_grace_period_s={grace_period} \
+          --shutdown_deadline_s={deadline} \
+          --stress_status_report_delay_ms={status_report_delay_ms} \
+          --hostname={hostname}".format(grace_period=EXEC_SHUTDOWN_GRACE_PERIOD_S,
+            deadline=EXEC_SHUTDOWN_DEADLINE_S, status_report_delay_ms=5000,
+            hostname=socket.gethostname()))
+  def test_shutdown_executor_with_delay(self):
+    """Regression test for IMPALA-7931 that adds delays to status reporting and
+    to fetching of results to trigger races that previously resulted in query failures."""
+    print self.exploration_strategy
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip()
+    self.do_test_shutdown_executor(fetch_delay_s=5)
+
+  def do_test_shutdown_executor(self, fetch_delay_s):
+    """Implementation of test that shuts down and then restarts an executor. This should
+    not disrupt any queries that start after the shutdown or complete before the shutdown
+    time limit. The test is parameterized by 'fetch_delay_s', the amount to delay before
+    fetching from the query that must survive shutdown of an executor."""
     # Add sleeps to make sure that the query takes a couple of seconds to execute on the
     # executors.
     QUERY = "select count(*) from functional_parquet.alltypes where sleep(1) = bool_col"
@@ -186,11 +213,17 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     # all executors through the startup grace period without disruption.
     before_shutdown_handle = self.__exec_and_wait_until_running(QUERY)
 
+    # Run this query which simulates getting stuck in admission control until after
+    # the startup grace period expires. This exercises the code path where the
+    # coordinator terminates the query before it has started up.
+    before_shutdown_admission_handle = self.execute_query_async(QUERY,
+        {'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'})
+
     # Shut down and wait for the shutdown state to propagate through statestore.
     result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2)
     assert parse_shutdown_result(result) == (
         "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S),
-        "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "1", "0")
+        "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "0", "1")
 
     # Check that the status is reflected on the debug page.
     web_json = self.cluster.impalads[1].service.get_debug_webpage_json("")
@@ -205,12 +238,22 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     # this query continue running through the full shutdown and restart cycle.
     after_shutdown_handle = self.__exec_and_wait_until_running(QUERY)
 
-    # Finish executing the first query before the backend exits.
-    assert self.__fetch_and_get_num_backends(QUERY, before_shutdown_handle) == 3
-
     # Wait for the impalad to exit, then start it back up and run another query, which
     # should be scheduled on it again.
     self.cluster.impalads[1].wait_for_exit()
+
+    # Finish fetching results from the first query (which will be buffered on the
+    # coordinator) after the backend exits. Add a delay before fetching to ensure
+    # that the query is not torn down on the coordinator when the failure is
+    # detected by the statestore (see IMPALA-7931).
+    assert self.__fetch_and_get_num_backends(
+        QUERY, before_shutdown_handle, delay_s=fetch_delay_s) == 3
+
+    # Confirm that the query stuck in admission failed.
+    self.__check_deadline_expired(QUERY, before_shutdown_admission_handle)
+
+    # Start the impalad back up and run another query, which should be scheduled on it
+    # again.
     self.cluster.impalads[1].start()
     self.impalad_test_service.wait_for_num_known_live_backends(
         3, timeout=30, interval=0.2, include_shutting_down=False)
@@ -225,7 +268,7 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
     result = self.execute_query_expect_success(self.client, SHUTDOWN_EXEC2)
     assert parse_shutdown_result(result) == (
         "{0}s000ms".format(self.EXEC_SHUTDOWN_GRACE_PERIOD_S),
-        "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "1", "0")
+        "{0}s000ms".format(self.EXEC_SHUTDOWN_DEADLINE_S), "0", "1")
     self.cluster.impalads[1].wait_for_exit()
     self.__check_deadline_expired(SLOW_QUERY, deadline_expiry_handle)
 
@@ -252,9 +295,9 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
 
     result = self.execute_query_expect_success(
         self.client, SHUTDOWN_EXEC3.format(LOW_DEADLINE))
-    _, deadline, finstances, _ = parse_shutdown_result(result)
+    _, deadline, _, queries_executing = parse_shutdown_result(result)
     assert deadline == "{0}s000ms".format(LOW_DEADLINE)
-    assert int(finstances) > 0, "Slow query should still be running."
+    assert int(queries_executing) > 0, "Slow query should still be running."
     self.cluster.impalads[2].wait_for_exit()
     self.__check_deadline_expired(SLOW_QUERY, deadline_expiry_handle)
 
@@ -286,7 +329,7 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
 
     # Shut down the coordinator. Operations that start after this point should fail.
     result = self.execute_query_expect_success(self.client, SHUTDOWN)
-    grace, deadline, _, registered = parse_shutdown_result(result)
+    grace, deadline, registered, _ = parse_shutdown_result(result)
     assert grace == "{0}s000ms".format(self.COORD_SHUTDOWN_GRACE_PERIOD_S)
     assert deadline == "{0}m".format(self.COORD_SHUTDOWN_DEADLINE_S / 60), "4"
     assert registered == "3"
@@ -349,11 +392,14 @@ class TestShutdownCommand(CustomClusterTestSuite, HS2TestSuite):
                 self.client.QUERY_STATES['RUNNING'], timeout=20)
     return handle
 
-  def __fetch_and_get_num_backends(self, query, handle):
+  def __fetch_and_get_num_backends(self, query, handle, delay_s=0):
     """Fetch the results of 'query' from the beeswax handle 'handle', close the
     query and return the number of backends obtained from the profile."""
     self.impalad_test_service.wait_for_query_state(self.client, handle,
                 self.client.QUERY_STATES['FINISHED'], timeout=20)
+    if delay_s > 0:
+      LOG.info("sleeping for {0}s".format(delay_s))
+      time.sleep(delay_s)
     self.client.fetch(query, handle)
     profile = self.client.get_runtime_profile(handle)
     self.client.close_query(handle)


[2/2] impala git commit: IMPALA-6742: Profiles of running queries should include execution summary.

Posted by jo...@apache.org.
IMPALA-6742: Profiles of running queries should include execution summary.

Currently execution summary is not included in the profiles of running
queries, and it's only reported when the query is finished. This jira makes
the execution summary to the profile reported when queries are still running.

Testing:
Added unit test.
Done with real cluster.

Change-Id: Idc7f714c9427d4b26d4e78cf27ceca2b0b336699
Reviewed-on: http://gerrit.cloudera.org:8080/11591
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e9652a48
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e9652a48
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e9652a48

Branch: refs/heads/master
Commit: e9652a48dd00c3c076ddccaa940d074b6970b7fc
Parents: a91b24c
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Mon Oct 29 21:54:15 2018 -0700
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Mon Jan 7 17:55:48 2019 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc        | 23 ++++++++++++++++-------
 be/src/service/impala-server.h         |  3 +++
 tests/query_test/test_observability.py | 23 +++++++++++++++++++++++
 3 files changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e9652a48/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c7e52ae..86e40d3 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -664,6 +664,9 @@ Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
       lock_guard<mutex> l(*request_state->lock());
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
+      if (request_state->GetCoordinator() != nullptr) {
+        UpdateExecSummary(request_state);
+      }
       if (format == TRuntimeProfileFormat::BASE64) {
         RETURN_IF_ERROR(request_state->profile()->SerializeToArchiveString(output));
       } else if (format == TRuntimeProfileFormat::THRIFT) {
@@ -1110,6 +1113,18 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
   return Status::OK();
 }
 
+void ImpalaServer::UpdateExecSummary(
+    std::shared_ptr<ClientRequestState> request_state) const {
+  DCHECK(request_state->GetCoordinator() != nullptr);
+  TExecSummary t_exec_summary;
+  request_state->GetCoordinator()->GetTExecSummary(&t_exec_summary);
+  request_state->summary_profile()->SetTExecSummary(t_exec_summary);
+  string exec_summary = PrintExecSummary(t_exec_summary);
+  request_state->summary_profile()->AddInfoStringRedacted("ExecSummary", exec_summary);
+  request_state->summary_profile()->AddInfoStringRedacted("Errors",
+      request_state->GetCoordinator()->GetErrorLog());
+}
+
 Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
     const Status* cause) {
   VLOG_QUERY << "UnregisterQuery(): query_id=" << PrintId(query_id);
@@ -1154,13 +1169,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
   }
 
   if (request_state->GetCoordinator() != nullptr) {
-    TExecSummary t_exec_summary;
-    request_state->GetCoordinator()->GetTExecSummary(&t_exec_summary);
-    request_state->summary_profile()->SetTExecSummary(t_exec_summary);
-    string exec_summary = PrintExecSummary(t_exec_summary);
-    request_state->summary_profile()->AddInfoStringRedacted("ExecSummary", exec_summary);
-    request_state->summary_profile()->AddInfoStringRedacted("Errors",
-        request_state->GetCoordinator()->GetErrorLog());
+    UpdateExecSummary(request_state);
   }
 
   if (request_state->schedule() != nullptr) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e9652a48/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index d9b4b05..981df74 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -637,6 +637,9 @@ class ImpalaServer : public ImpalaServiceIf,
   Status GetExecSummary(const TUniqueId& query_id, const std::string& user,
       TExecSummary* result) WARN_UNUSED_RESULT;
 
+  /// Collect ExecSummary and update it to the profile in request_state
+  void UpdateExecSummary(std::shared_ptr<ClientRequestState> request_state) const;
+
   /// Initialize "default_configs_" to show the default values for ImpalaQueryOptions and
   /// "support_start_over/false" to indicate that Impala does not support start over
   /// in the fetch call.

http://git-wip-us.apache.org/repos/asf/impala/blob/e9652a48/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 7f4b2c8..fd44b80 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -216,6 +216,29 @@ class TestObservability(ImpalaTestSuite):
     # fetch an exec_summary.
     assert exec_summary is not None and exec_summary.nodes is not None
 
+  def test_exec_summary_in_runtime_profile(self):
+    """Test that the exec summary is populated in runtime profile correctly in every
+    query state"""
+    query = "select count(*) from functional.alltypes"
+    handle = self.execute_query_async(query,
+        {"debug_action": "CRS_BEFORE_ADMISSION:SLEEP@1000"})
+
+    # If ExecuteStatement() has completed and the query is paused in the admission control
+    # phase, then the coordinator has not started yet and exec_summary should be empty.
+    profile = self.client.get_runtime_profile(handle)
+    assert "ExecSummary:" not in profile, profile
+    # After completion of the admission control phase, the coordinator would have started
+    # and we should get a populated exec_summary.
+    self.client.wait_for_admission_control(handle)
+    profile = self.client.get_runtime_profile(handle)
+    assert "ExecSummary:" in profile, profile
+
+    self.client.fetch(query, handle)
+    # After fetching the results and reaching finished state, we should still be able to
+    # fetch an exec_summary in profile.
+    profile = self.client.get_runtime_profile(handle)
+    assert "ExecSummary:" in profile, profile
+
   @SkipIfLocal.multiple_impalad
   def test_profile_fragment_instances(self):
     """IMPALA-6081: Test that the expected number of fragment instances and their exec