You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/08/08 07:26:03 UTC

[2/4] impala git commit: IMPALA-7163: Implement a state machine for the QueryState class

IMPALA-7163: Implement a state machine for the QueryState class

This patch adds a state machine for the QueryState class. The motivation
behind this patch is to make the query lifecycle from the point of
view of an executor much easier to reason about and this patch is key
for a follow on patch for IMPALA-2990 where the status reporting will
be per-query rather than per-fragment-instance. Currently, the state
machine provides no other purpose, and it will mostly be used for
IMPALA-2990.

We introduce 5 possible states for the QueryState which include 3
terminal states (FINISHED, CANCELLED and ERROR) and 2 non-terminal
states (PREPARING, EXECUTING). The transition from one state to the
next is always handled by a single thread which is also the QueryState
thread. This thread will additionally bear the purpose of sending
periodic updates after IMPALA-4063, which is the primary reason behind
having only this thread modify the state of the query.

Counting barriers are introduced to keep a count of how many fragment
instances have finished Preparing and Executing. These barriers also
block until all the fragment instances have finished a respective state.
The fragment instances update the query wide query status if an error is
hit and unblocks the barrier if it is in the EXECUTING state. The
PREPARING state blocks regardless of whether a fragment instance hit an
error or not, until all the fragment instances have completed
successfully or unsuccessfully, to maintain the invariant that fragment
instances cannot be cancelled until the entire QueryState has finished
PREPARING.

The status reporting protocol has not been changed and remains exactly
as it was.

Testing:
- Added 3 failure points in the query lifecycle using debug actions
  and added tests to validate the same (extension of IMPALA-7376).
- Ran 'core' and 'exhaustive' tests.

Future related work:
1) IMPALA-2990: Make status reporting per-query.
2) Try to logically align the FIS states with the QueryState states.
3) Consider mirroring the QueryState state machine to
CoordinatorBackendState

Change-Id: Iec5670a7db83ecae4656d7bb2ea372d3767ba7fe
Reviewed-on: http://gerrit.cloudera.org:8080/10813
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cbc8c63e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cbc8c63e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cbc8c63e

Branch: refs/heads/master
Commit: cbc8c63ef0446550bd080e226b38307c4de967eb
Parents: 35bce6b
Author: Sailesh Mukil <sa...@apache.org>
Authored: Wed Jun 20 20:15:30 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed Aug 8 00:16:18 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc              |   9 +-
 be/src/runtime/fragment-instance-state.cc  |  51 +++++----
 be/src/runtime/fragment-instance-state.h   |   8 --
 be/src/runtime/query-state.cc              | 134 ++++++++++++++++++-----
 be/src/runtime/query-state.h               | 136 ++++++++++++++++++++++--
 common/thrift/ImpalaInternalService.thrift |   4 +-
 tests/failure/test_failpoints.py           |  26 +++++
 7 files changed, 299 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1567c9a..4b87b69 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -144,14 +144,13 @@ Status Coordinator::Exec() {
     if (coord_instance_ == nullptr) {
       // at this point, the query is done with the Prepare phase, and we expect
       // to have a coordinator instance, but coord_instance_ == nullptr,
-      // which means we failed Prepare
-      Status prepare_status = query_state_->WaitForPrepare();
-      DCHECK(!prepare_status.ok());
-      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
+      // which means we failed before or during Prepare().
+      Status query_status = query_state_->WaitForPrepare();
+      DCHECK(!query_status.ok());
+      return UpdateExecState(query_status, nullptr, FLAGS_hostname);
     }
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase is
     // done and the FragmentInstanceState's root sink will be set up.
-    DCHECK(coord_instance_->IsPrepared() && coord_instance_->WaitForPrepare().ok());
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index cbae601..51ff13d 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -71,13 +71,18 @@ FragmentInstanceState::FragmentInstanceState(
 }
 
 Status FragmentInstanceState::Exec() {
+  bool is_prepared = false;
   Status status = Prepare();
   DCHECK(runtime_state_ != nullptr);  // we need to guarantee at least that
-  discard_result(prepared_promise_.Set(status));
+
   if (!status.ok()) {
     discard_result(opened_promise_.Set(status));
     goto done;
   }
+  // Tell the managing 'QueryState' that we're done with Prepare().
+  query_state_->DonePreparing();
+  is_prepared = true;
+
   status = Open();
   discard_result(opened_promise_.Set(status));
   if (!status.ok()) goto done;
@@ -90,7 +95,23 @@ Status FragmentInstanceState::Exec() {
     status = ExecInternal();
   }
 
+  if (!status.ok()) goto done;
+  // Tell the managing 'QueryState' that we're done with executing and that we've stopped
+  // the reporting thread.
+  query_state_->DoneExecuting();
+
 done:
+  if (!status.ok()) {
+    if (!is_prepared) {
+      DCHECK_LE(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+      // Tell the managing 'QueryState' that we hit an error during Prepare().
+      query_state_->ErrorDuringPrepare(status, instance_id());
+    } else {
+      DCHECK_GT(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
+      // Tell the managing 'QueryState' that we hit an error during execution.
+      query_state_->ErrorDuringExecute(status, instance_id());
+    }
+  }
   UpdateState(StateEvent::EXEC_END);
   // call this before Close() to make sure the thread token got released
   Finalize(status);
@@ -99,10 +120,6 @@ done:
 }
 
 void FragmentInstanceState::Cancel() {
-  // Make sure Prepare() finished. We don't care about the status since the query is
-  // being cancelled.
-  discard_result(WaitForPrepare());
-
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
   if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
@@ -110,7 +127,7 @@ void FragmentInstanceState::Cancel() {
 }
 
 Status FragmentInstanceState::Prepare() {
-  DCHECK(!prepared_promise_.IsSet());
+  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_EXEC);
   VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_);
 
   // Do not call RETURN_IF_ERROR or explicitly return before this line,
@@ -236,8 +253,8 @@ Status FragmentInstanceState::Prepare() {
 }
 
 Status FragmentInstanceState::Open() {
-  DCHECK(prepared_promise_.IsSet());
   DCHECK(!opened_promise_.IsSet());
+  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_PREPARE);
   SCOPED_TIMER(profile()->total_time_counter());
   SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -266,6 +283,9 @@ Status FragmentInstanceState::Open() {
 
   {
     UpdateState(StateEvent::OPEN_START);
+    // Inject failure if debug actions are enabled.
+    RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_OPEN"));
+
     SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
     RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
   }
@@ -273,6 +293,10 @@ Status FragmentInstanceState::Open() {
 }
 
 Status FragmentInstanceState::ExecInternal() {
+  DCHECK_EQ(current_state_.Load(), TFInstanceExecState::WAITING_FOR_OPEN);
+  // Inject failure if debug actions are enabled.
+  RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_EXEC_INTERNAL"));
+
   RuntimeProfile::Counter* plan_exec_timer =
       ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -484,14 +508,6 @@ void FragmentInstanceState::ReleaseThreadToken() {
   }
 }
 
-Status FragmentInstanceState::WaitForPrepare() {
-  return prepared_promise_.Get();
-}
-
-bool FragmentInstanceState::IsPrepared() {
-  return prepared_promise_.IsSet();
-}
-
 Status FragmentInstanceState::WaitForOpen() {
   return opened_promise_.Get();
 }
@@ -499,8 +515,6 @@ Status FragmentInstanceState::WaitForOpen() {
 void FragmentInstanceState::PublishFilter(const TPublishFilterParams& params) {
   VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
             << " filter_id=" << params.filter_id;
-  // Wait until Prepare() is done, so we know that the filter bank is set up.
-  if (!WaitForPrepare().ok()) return;
   runtime_state_->filter_bank()->PublishGlobalFilter(params);
 }
 
@@ -508,14 +522,15 @@ string FragmentInstanceState::ExecStateToString(const TFInstanceExecState::type
   // Labels to send to the debug webpages to display the current state to the user.
   static const string finstance_state_labels[] = {
       "Waiting for Exec",         // WAITING_FOR_EXEC
-      "Waiting for Codegen",      // WAITING_FOR_CODEGEN
       "Waiting for Prepare",      // WAITING_FOR_PREPARE
+      "Waiting for Codegen",      // WAITING_FOR_CODEGEN
       "Waiting for First Batch",  // WAITING_FOR_OPEN
       "Waiting for First Batch",  // WAITING_FOR_FIRST_BATCH
       "First batch produced",     // FIRST_BATCH_PRODUCED
       "Producing Data",           // PRODUCING_DATA
       "Last batch sent",          // LAST_BATCH_SENT
       "Finished"                  // FINISHED
+
   };
   /// Make sure we have a label for every possible state.
   static_assert(

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index df27f9c..d1f21f5 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -76,8 +76,6 @@ class RuntimeState;
 ///
 /// TODO:
 /// - absorb RuntimeState?
-/// - should WaitForPrepare/Open() return the overall execution status, if there
-///   was a failure?
 class FragmentInstanceState {
  public:
   FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
@@ -91,12 +89,6 @@ class FragmentInstanceState {
   /// Cancels execution and sends a final status report. Idempotent.
   void Cancel();
 
-  /// Blocks until the Prepare phase of Exec() is finished and returns the status.
-  Status WaitForPrepare();
-
-  /// Returns true if the Prepare phase of Exec() is finished.
-  bool IsPrepared();
-
   /// Blocks until the Prepare phase of Exec() is finished and the exec tree is
   /// opened, and returns that status. If the preparation phase encountered an error,
   /// GetOpenStatus() will return that error without blocking.

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index c86095c..2ae4a27 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -138,6 +138,11 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
   rpc_params_.__isset.fragment_instance_ctxs = true;
 
+  instances_prepared_barrier_.reset(
+      new CountingBarrier(rpc_params_.fragment_instance_ctxs.size()));
+  instances_finished_barrier_.reset(
+      new CountingBarrier(rpc_params_.fragment_instance_ctxs.size()));
+
   // Claim the query-wide minimum reservation. Do this last so that we don't need
   // to handle releasing it if a later step fails.
   initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_,
@@ -193,9 +198,49 @@ Status QueryState::InitBufferPoolState() {
   return Status::OK();
 }
 
+const char* QueryState::BackendExecStateToString(const BackendExecState& state) {
+  static const unordered_map<BackendExecState, const char*> exec_state_to_str{
+      {BackendExecState::PREPARING, "PREPARING"},
+      {BackendExecState::EXECUTING, "EXECUTING"},
+      {BackendExecState::FINISHED, "FINISHED"},
+      {BackendExecState::CANCELLED, "CANCELLED"},
+      {BackendExecState::ERROR, "ERROR"}};
+
+  return exec_state_to_str.at(state);
+}
+
+inline bool QueryState::IsTerminalState(const BackendExecState& state) {
+  return state == BackendExecState::FINISHED
+      || state == BackendExecState::CANCELLED
+      || state == BackendExecState::ERROR;
+}
+
+Status QueryState::UpdateBackendExecState() {
+  BackendExecState old_state = backend_exec_state_;
+
+  unique_lock<SpinLock> l(status_lock_);
+  // We shouldn't call this function if we're already in a terminal state.
+  DCHECK(!IsTerminalState(backend_exec_state_))
+      << " Current State: " << BackendExecStateToString(backend_exec_state_)
+      << " | Current Status: " << query_status_.GetDetail();
+
+  if (query_status_.IsCancelled()) {
+    // Received cancellation - go to CANCELLED state.
+    backend_exec_state_ = BackendExecState::CANCELLED;
+  } else if (!query_status_.ok()) {
+    // Error while executing - go to ERROR state.
+    backend_exec_state_ = BackendExecState::ERROR;
+  } else {
+    // Transition to the next state in the lifecycle.
+    backend_exec_state_ = old_state == BackendExecState::PREPARING ?
+        BackendExecState::EXECUTING : BackendExecState::FINISHED;
+  }
+  return query_status_;
+}
+
 FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
   VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
-  if (!instances_prepared_promise_.Get().ok()) return nullptr;
+  if (!WaitForPrepare().ok()) return nullptr;
   auto it = fis_map_.find(instance_id);
   return it != fis_map_.end() ? it->second : nullptr;
 }
@@ -211,7 +256,6 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   DCHECK(status.ok() || done);
   // if this is not for a specific fragment instance, we're reporting an error
   DCHECK(fis != nullptr || !status.ok());
-  DCHECK(fis == nullptr || fis->IsPrepared());
 
   // This will send a report even if we are cancelled.  If the query completed correctly
   // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
@@ -293,7 +337,17 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
 }
 
 Status QueryState::WaitForPrepare() {
-  return instances_prepared_promise_.Get();
+  instances_prepared_barrier_->Wait();
+
+  unique_lock<SpinLock> l(status_lock_);
+  return query_status_;
+}
+
+Status QueryState::WaitForFinish() {
+  instances_finished_barrier_->Wait();
+
+  unique_lock<SpinLock> l(status_lock_);
+  return query_status_;
 }
 
 void QueryState::StartFInstances() {
@@ -306,7 +360,12 @@ void QueryState::StartFInstances() {
   DCHECK(query_ctx().__isset.desc_tbl);
   Status status = DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, &desc_tbl_);
   if (!status.ok()) {
-    discard_result(instances_prepared_promise_.Set(status));
+    ErrorDuringPrepare(status, TUniqueId());
+    Status updated_query_status = UpdateBackendExecState();
+    instances_prepared_barrier_->NotifyRemaining();
+    DCHECK(!updated_query_status.ok());
+    // TODO (IMPALA-4063): This call to ReportExecStatusAux() should internally be handled
+    // by UpdateBackendExecState().
     ReportExecStatusAux(true, status, nullptr, false);
     return;
   }
@@ -317,6 +376,7 @@ void QueryState::StartFInstances() {
   DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
   TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
   int fragment_ctx_idx = 0;
+  int num_unstarted_instances = rpc_params_.fragment_instance_ctxs.size();
   fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
     // determine corresponding TPlanFragmentCtx
@@ -333,13 +393,23 @@ void QueryState::StartFInstances() {
     // start new thread to execute instance
     refcnt_.Add(1); // decremented in ExecFInstance()
     AcquireExecResourceRefcount(); // decremented in ExecFInstance()
+
+    // Add the fragment instance ID to the 'fis_map_'.
+    fis_map_.emplace(fis->instance_id(), fis);
+
     string thread_name = Substitute("$0 (finst:$1)",
         FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
         PrintId(instance_ctx.fragment_instance_id));
     unique_ptr<Thread> t;
-    thread_create_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
-        thread_name, [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+
+    // Inject thread creation failures through debug actions if enabled.
+    Status debug_action_status = DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
+    thread_create_status = debug_action_status.ok() ?
+        Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
+            [this, fis]() { this->ExecFInstance(fis); }, &t, true) :
+        debug_action_status;
     if (!thread_create_status.ok()) {
+      fis_map_.erase(fis->instance_id());
       // Undo refcnt increments done immediately prior to Thread::Create(). The
       // reference counts were both greater than zero before the increments, so
       // neither of these decrements will free any structures.
@@ -347,33 +417,45 @@ void QueryState::StartFInstances() {
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
       break;
     }
-    // Fragment instance successfully started
-    fis_map_.emplace(fis->instance_id(), fis);
     // update fragment_map_
     vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
     fis_list.push_back(fis);
     t->Detach();
+    --num_unstarted_instances;
   }
 
-  // don't return until every instance is prepared and record the first non-OK
-  // (non-CANCELLED if available) status (including any error from thread creation
-  // above).
-  Status prepare_status = thread_create_status;
-  for (auto entry: fis_map_) {
-    Status instance_status = entry.second->WaitForPrepare();
-    // don't wipe out an error in one instance with the resulting CANCELLED from
-    // the remaining instances
-    if (!instance_status.ok() && (prepare_status.ok() || prepare_status.IsCancelled())) {
-      prepare_status = instance_status;
-    }
-  }
-  discard_result(instances_prepared_promise_.Set(prepare_status));
-  // If this is aborting due to failure in thread creation, report status to the
-  // coordinator to start query cancellation. (Other errors are reported by the
-  // fragment instance itself.)
   if (!thread_create_status.ok()) {
+    // We failed to start 'num_unstarted_instances', so make sure to notify
+    // 'instances_prepared_barrier_' 'num_unstarted_instances - 1' times, to unblock
+    // WaitForPrepare(). The last remaining notification will be set by the call to
+    // ErrorDuringPrepare() below.
+    while (num_unstarted_instances > 1) {
+      DonePreparing();
+      --num_unstarted_instances;
+    }
+
+    // We prioritize thread creation failure as a query killing error, even over an error
+    // during Prepare() for a FIS.
+    // We have to notify anyone waiting on WaitForPrepare() that this query has failed.
+    ErrorDuringPrepare(thread_create_status, TUniqueId());
+    Status updated_query_status = UpdateBackendExecState();
+    DCHECK(!updated_query_status.ok());
+    // Block until all the already started fragment instances finish Prepare()-ing to
+    // to report an error.
+    discard_result(WaitForPrepare());
     ReportExecStatusAux(true, thread_create_status, nullptr, true);
+    return;
   }
+
+  discard_result(WaitForPrepare());
+  if (!UpdateBackendExecState().ok()) return;
+  DCHECK(backend_exec_state_ == BackendExecState::EXECUTING)
+      << BackendExecStateToString(backend_exec_state_);
+
+  discard_result(WaitForFinish());
+  if (!UpdateBackendExecState().ok()) return;
+  DCHECK(backend_exec_state_ == BackendExecState::FINISHED)
+      << BackendExecStateToString(backend_exec_state_);
 }
 
 void QueryState::AcquireExecResourceRefcount() {
@@ -414,13 +496,13 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
 
 void QueryState::Cancel() {
   VLOG_QUERY << "Cancel: query_id=" << PrintId(query_id());
-  (void) instances_prepared_promise_.Get();
+  discard_result(WaitForPrepare());
   if (!is_cancelled_.CompareAndSwap(0, 1)) return;
   for (auto entry: fis_map_) entry.second->Cancel();
 }
 
 void QueryState::PublishFilter(const TPublishFilterParams& params) {
-  if (!instances_prepared_promise_.Get().ok()) return;
+  if (!WaitForPrepare().ok()) return;
   DCHECK_EQ(fragment_map_.count(params.dst_fragment_idx), 1);
   for (FragmentInstanceState* fis : fragment_map_[params.dst_fragment_idx]) {
     fis->PublishFilter(params);

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index ae3bdd5..607d82a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -19,6 +19,7 @@
 #define IMPALA_RUNTIME_QUERY_STATE_H
 
 #include <memory>
+#include <mutex>
 #include <unordered_map>
 #include <boost/scoped_ptr.hpp>
 
@@ -27,8 +28,9 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/uid-util.h"
+#include "util/counting-barrier.h"
 #include "util/promise.h"
+#include "util/uid-util.h"
 
 namespace impala {
 
@@ -61,6 +63,19 @@ class RuntimeState;
 /// When any fragment instance execution returns with an error status, all
 /// fragment instances are automatically cancelled.
 ///
+/// We maintain a state denoted by BackendExecState. We transition from one non-error
+/// state to the next only if *all* underlying fragment instances have done so.
+/// Eg: We transition from the PREPARING state to the EXECUTING state only if *all* the
+/// underlying fragment instances have finished Prepare().
+/// However, the behavior for transitioning from a non-error state to an error state is
+/// different for different states. If any fragment instance hits an error or cancellation
+/// during the EXECUTING state, then we immediately change the state of the query to the
+/// ERROR or CANCELLED state accordingly.
+/// However, if a fragment instance hits an error during Prepare(), we still wait for
+/// *all* fragment instances to complete preparing before transitioning to the ERROR
+/// state. This is to simplify the query lifecycle so that Prepare() is always completed
+/// before it can handle either a Cancel() RPC or a PublishFilter() RPC.
+///
 /// Status reporting: all instances currently report their status independently.
 /// Each instance sends at least one final status report with its overall execution
 /// status, so if any of the instances encountered an error, that error will be reported.
@@ -130,15 +145,10 @@ class QueryState {
 
   /// Performs the runtime-intensive parts of initial setup and starts all fragment
   /// instances belonging to this query. Each instance receives its own execution
-  /// thread. Blocks until all fragment instances have finished their Prepare phase.
-  /// Not idempotent, not thread-safe.
+  /// thread. Blocks until a terminal state has been reached.
+  /// Not idempotent, not thread-safe. Must only be called by the QueryState thread.
   void StartFInstances();
 
-  /// Return overall status of Prepare phases of fragment instances. A failure
-  /// in any instance's Prepare will cause this function to return an error status.
-  /// Blocks until all fragment instances have finished their Prepare phase.
-  Status WaitForPrepare();
-
   /// Blocks until all fragment instances have finished their Prepare phase.
   FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
 
@@ -176,6 +186,48 @@ class QueryState {
 
   ~QueryState();
 
+  /// Return overall status of Prepare() phases of fragment instances. A failure
+  /// in any instance's Prepare() will cause this function to return an error status.
+  /// Blocks until all fragment instances have finished their Prepare() phase.
+  Status WaitForPrepare();
+
+  /// Called by a FragmentInstanceState thread to notify that it's done preparing.
+  void DonePreparing() { discard_result(instances_prepared_barrier_->Notify()); }
+
+  /// Called by a FragmentInstanceState thread to notify that it's done executing.
+  void DoneExecuting() { discard_result(instances_finished_barrier_->Notify()); }
+
+  /// Called by a fragment instance thread to notify that it hit an error during Prepare()
+  /// Updates the query status and the failed instance ID if it's not set already.
+  /// Also notifies anyone waiting on WaitForPrepare() if this is called by the last
+  /// fragment instance to complete Prepare().
+  void ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
+    // Do a racy check to avoid getting the lock if an error is already set.
+    if (query_status_.ok()) {
+      std::unique_lock<SpinLock> l(status_lock_);
+      if (query_status_.ok()) {
+        query_status_ = status;
+        failed_finstance_id_ = finst_id;
+      }
+    }
+    discard_result(instances_prepared_barrier_->Notify());
+  }
+
+  /// Called by a fragment instance thread to notify that it hit an error during Execute()
+  /// Updates the query status and records the failed instance ID if they're not set
+  /// already. Also notifies anyone waiting on WaitForFinish().
+  void ErrorDuringExecute(const Status& status, const TUniqueId& finst_id) {
+    // Do a racy check to avoid getting the lock if an error is already set.
+    if (query_status_.ok()) {
+      std::unique_lock<SpinLock> l(status_lock_);
+      if (query_status_.ok()) {
+        query_status_ = status;
+        failed_finstance_id_ = finst_id;
+      }
+    }
+    instances_finished_barrier_->NotifyRemaining();
+  }
+
  private:
   friend class QueryExecMgr;
 
@@ -185,6 +237,60 @@ class QueryState {
 
   static const int DEFAULT_BATCH_SIZE = 1024;
 
+  /// Return overall status of all fragment instances during execution. A failure
+  /// in any instance's execution (after Prepare()) will cause this function
+  /// to return an error status. Blocks until all fragment instances have finished
+  /// executing or until one of them hits an error.
+  Status WaitForFinish();
+
+  /// States that a query goes through during its lifecycle.
+  enum class BackendExecState {
+    /// PREPARING: The inital state on receiving an ExecQueryFInstances() RPC from the
+    /// coordinator. Implies that the fragment instances are being started.
+    PREPARING,
+    /// EXECUTING: All fragment instances managed by this QueryState have successfully
+    /// completed Prepare(). Implies that the query is executing.
+    EXECUTING,
+    /// FINISHED: All fragment instances managed by this QueryState have successfully
+    /// completed executing.
+    FINISHED,
+    /// CANCELLED: This query received a CancelQueryFInstances() RPC or was directed by
+    /// the coordinator to cancel itself from a response to a ReportExecStatus() RPC.
+    /// Does not imply that all the fragment instances have realized cancellation however.
+    CANCELLED,
+    /// ERROR: received an error from a fragment instance.
+    ERROR
+  };
+
+  /// Current state of this query in this executor.
+  /// Thread-safety: Only updated by the QueryState thread.
+  BackendExecState backend_exec_state_ = BackendExecState::PREPARING;
+
+  /// Updates the BackendExecState based on 'query_status_'. A state transition happens
+  /// if the current state is a non-terminal state; the transition can either be to the
+  /// next legal state or ERROR if 'query_status_' is an error. Thread safe. This is a
+  /// helper function to StartFInstances() which executes on the QueryState thread.
+  Status UpdateBackendExecState();
+
+  /// A string representation of 'state'.
+  const char* BackendExecStateToString(const BackendExecState& state);
+
+  /// Returns 'true' if 'state' is a terminal state (FINISHED, CANCELLED, ERROR).
+  inline bool IsTerminalState(const BackendExecState& state);
+
+  /// Protects 'query_status_' and 'failed_finstance_id_'.
+  SpinLock status_lock_;
+
+  /// The overall status of this QueryState.
+  /// Status::OK if all the fragment instances managed by this QS are also Status::OK;
+  /// Otherwise, it will reflect the first non-OK status of a FIS.
+  /// Protected by 'status_lock_'.
+  Status query_status_;
+
+  /// ID of first fragment instance to hit an error.
+  /// Protected by 'status_lock_'.
+  TUniqueId failed_finstance_id_;
+
   /// set in c'tor
   const TQueryCtx query_ctx_;
 
@@ -216,9 +322,17 @@ class QueryState {
   /// created in StartFInstances(), owned by obj_pool_
   DescriptorTbl* desc_tbl_ = nullptr;
 
-  /// Barrier for the completion of the Prepare phases of all fragment instances,
-  /// set in StartFInstances().
-  Promise<Status> instances_prepared_promise_;
+  /// Barrier for the completion of the Prepare() phases of all fragment instances. This
+  /// just blocks until ALL fragment instances have finished preparing, regardless of
+  /// whether they hit an error or not.
+  std::unique_ptr<CountingBarrier> instances_prepared_barrier_;
+
+  /// Barrier for the completion of all the fragment instances.
+  /// If the 'Status' is not OK due to an error during fragment instance execution, this
+  /// barrier is unblocked immediately.
+  /// 'query_status_' will be set once this is unblocked and so will 'failed_instance_id_'
+  /// if an error is hit.
+  std::unique_ptr<CountingBarrier> instances_finished_barrier_;
 
   /// map from instance id to its state (owned by obj_pool_), populated in
   /// StartFInstances(); not valid to read from until instances_prepare_promise_

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index c22b662..80da2a9 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -624,10 +624,12 @@ struct TErrorLogEntry {
 // Represents the states that a fragment instance goes through during its execution. The
 // current state gets sent back to the coordinator and will be presented to users through
 // the debug webpages.
+// The states are listed in order and one state will only strictly be reached after all
+// the previous states.
 enum TFInstanceExecState {
   WAITING_FOR_EXEC,
-  WAITING_FOR_CODEGEN,
   WAITING_FOR_PREPARE,
+  WAITING_FOR_CODEGEN,
   WAITING_FOR_OPEN,
   WAITING_FOR_FIRST_BATCH,
   FIRST_BATCH_PRODUCED,

http://git-wip-us.apache.org/repos/asf/impala/blob/cbc8c63e/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 278c2f1..ef9ed07 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -146,6 +146,32 @@ class TestFailpoints(ImpalaTestSuite):
     self.execute_query_expect_failure(self.client, query,
         query_options={'debug_action':debug_action})
 
+    # Fail the Open() phase of all fragment instances.
+    debug_action = 'FIS_IN_OPEN:FAIL@1.0'
+    self.execute_query_expect_failure(self.client, query,
+        query_options={'debug_action': debug_action})
+
+    # Fail the ExecInternal() phase of all fragment instances.
+    debug_action = 'FIS_IN_EXEC_INTERNAL:FAIL@1.0'
+    self.execute_query_expect_failure(self.client, query,
+        query_options={'debug_action': debug_action})
+
+    # Fail the fragment instance thread creation with a 0.5 probability.
+    debug_action = 'FIS_FAIL_THREAD_CREATION:FAIL@0.5'
+
+    # We want to test the behavior when only some fragment instance threads fail to be
+    # created, so we set the probability of fragment instance thread creation failure to
+    # 0.5. Since there's only a 50% chance of fragment instance thread creation failure,
+    # we attempt to catch a query failure up to a very conservative maximum of 50 tries.
+    for i in range(50):
+      try:
+        self.execute_query(query,
+            query_options={'debug_action': debug_action})
+      except ImpalaBeeswaxException as e:
+        assert 'Query aborted:Debug Action: FIS_FAIL_THREAD_CREATION:FAIL@0.5' \
+            in str(e), str(e)
+        break
+
   def __execute_fail_action(self, query, vector):
     try:
       self.execute_query(query, vector.get_value('exec_option'),