You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/15 07:19:55 UTC

impala git commit: IMPALA-5384, part 2: Simplify Coordinator locking and clarify state

Repository: impala
Updated Branches:
  refs/heads/2.x 22244bb07 -> da329442a


IMPALA-5384, part 2: Simplify Coordinator locking and clarify state

The is the final change to clarify and break up the Coordinator's lock.
The state machine for the coordinator is made explicit, distinguishing
between executing state and multiple terminal states. Logic to
transition into a terminal state is centralized in one location and
executes exactly once for each coordinator object.

Derived from a patch for IMPALA_5384 by Marcel Kornacker.

Testing:
- exhaustive functional tests
- stress test on minicluster with memory overcommitment. Verified from
  the logs that this exercises all these paths:
  - successful queries
  - client requested cancellation
  - error from exec FInstances RPC
  - error reported asynchronously via report status RPC
  - eos before backend execution completed

Change-Id: I1abdfd02163f9356c59d470fe1c64ebe012a9e8e
Reviewed-on: http://gerrit.cloudera.org:8080/10158
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10389


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/da329442
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/da329442
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/da329442

Branch: refs/heads/2.x
Commit: da329442a598c29c97b9f43964c1b2af263c8391
Parents: 22244bb
Author: Dan Hecht <dh...@cloudera.com>
Authored: Fri Apr 13 16:51:25 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Mon May 14 22:00:38 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.h |   8 +
 be/src/runtime/coordinator.cc              | 424 +++++++++++-------------
 be/src/runtime/coordinator.h               | 333 ++++++++++---------
 be/src/service/client-request-state.cc     |   2 +-
 be/src/service/impala-server.h             |   5 -
 be/src/util/counting-barrier.h             |  21 +-
 6 files changed, 397 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index d2f122c..e7af2e2 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -21,9 +21,17 @@
 #include <vector>
 #include <unordered_set>
 
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/median.hpp>
+#include <boost/accumulators/statistics/min.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "runtime/coordinator.h"
+#include "scheduling/query-schedule.h"
 #include "util/progress-updater.h"
 #include "util/stopwatch.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index d6a70e7..db07a3f 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -25,6 +25,7 @@
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
 
+#include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
 #include "gen-cpp/ImpalaInternalService.h"
@@ -39,6 +40,7 @@
 #include "runtime/query-state.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
+#include "scheduling/query-schedule.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
 #include "util/hdfs-bulk-ops.h"
@@ -51,16 +53,13 @@
 
 using namespace apache::thrift;
 using namespace rapidjson;
-using namespace strings;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
 using boost::algorithm::token_compress_on;
 using boost::algorithm::split;
 using boost::filesystem::path;
-using std::unique_ptr;
 
-DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
 using namespace impala;
@@ -76,11 +75,9 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(released_exec_resources_)
-      << "ReleaseExecResources() must be called before Coordinator is destroyed";
-  DCHECK(released_admission_control_resources_)
-      << "ReleaseAdmissionControlResources() must be called before Coordinator is "
-      << "destroyed";
+  // Must have entered a terminal exec state guaranteeing resources were released.
+  DCHECK_NE(exec_state_, ExecState::EXECUTING);
+  // Release the coordinator's reference to the query control structures.
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -109,12 +106,6 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
-  // to keep things simple, make async Cancel() calls wait until plan fragment
-  // execution has been initiated, otherwise we might try to cancel fragment
-  // execution at Impala daemons where it hasn't even started
-  // TODO: revisit this, it may not be true anymore
-  lock_guard<mutex> l(lock_);
-
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
   query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
@@ -138,9 +129,9 @@ Status Coordinator::Exec() {
     InitFilterRoutingTable();
   }
 
-  // At this point, all static setup is done and all structures are initialized.
-  // Only runtime-related state changes past this point (examples:
-  // num_remaining_backends_, fragment instance profiles, etc.)
+  // At this point, all static setup is done and all structures are initialized. Only
+  // runtime-related state changes past this point (examples: fragment instance
+  // profiles, etc.)
 
   StartBackendExec();
   RETURN_IF_ERROR(FinishBackendStartup());
@@ -155,7 +146,7 @@ Status Coordinator::Exec() {
       // which means we failed Prepare
       Status prepare_status = query_state_->WaitForPrepare();
       DCHECK(!prepare_status.ok());
-      return prepare_status;
+      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
     }
 
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase
@@ -169,7 +160,6 @@ Status Coordinator::Exec() {
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }
-
   return Status::OK();
 }
 
@@ -208,6 +198,8 @@ void Coordinator::InitFragmentStats() {
 void Coordinator::InitBackendStates() {
   int num_backends = schedule_.per_backend_exec_params().size();
   DCHECK_GT(num_backends, 0);
+
+  lock_guard<SpinLock> l(backend_states_init_lock_);
   backend_states_.resize(num_backends);
 
   RuntimeProfile::Counter* num_backends_counter =
@@ -215,19 +207,13 @@ void Coordinator::InitBackendStates() {
   num_backends_counter->Set(num_backends);
 
   // create BackendStates
-  bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
-  const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address();
   int backend_idx = 0;
   for (const auto& entry: schedule_.per_backend_exec_params()) {
-    if (has_coord_fragment && coord_address == entry.first) {
-      coord_backend_idx_ = backend_idx;
-    }
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(query_id(), backend_idx, filter_mode_));
     backend_state->Init(entry.second, fragment_stats_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
-  DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -341,8 +327,8 @@ void Coordinator::InitFilterRoutingTable() {
 
 void Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
-  exec_complete_barrier_.reset(new CountingBarrier(num_backends));
-  num_remaining_backends_ = num_backends;
+  exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends));
+  backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
 
   DebugOptions debug_options(schedule_.query_options());
 
@@ -354,11 +340,11 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
-            exec_complete_barrier_.get());
+              exec_rpcs_complete_barrier_.get());
         });
   }
+  exec_rpcs_complete_barrier_->Wait();
 
-  exec_complete_barrier_->Wait();
   VLOG_QUERY << "started execution on " << num_backends << " backends for query_id="
              << PrintId(query_id());
   query_events_->MarkEvent(
@@ -367,26 +353,24 @@ void Coordinator::StartBackendExec() {
 }
 
 Status Coordinator::FinishBackendStartup() {
-  Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
   // Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
   HistogramMetric latencies(def, 30 * 60 * 1000, 4);
+  Status status = Status::OK();
+  string error_hostname;
   for (BackendState* backend_state: backend_states_) {
     // preserve the first non-OK, if there is one
     Status backend_status = backend_state->GetStatus();
-    if (!backend_status.ok() && status.ok()) status = backend_status;
+    if (!backend_status.ok() && status.ok()) {
+      status = backend_status;
+      error_hostname = backend_state->impalad_address().hostname;
+    }
     latencies.Update(backend_state->rpc_latency());
   }
-
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
-
-  if (!status.ok()) {
-    query_status_ = status;
-    CancelInternal();
-  }
-  return status;
+  return UpdateExecState(status, nullptr, error_hostname);
 }
 
 string Coordinator::FilterDebugString() {
@@ -446,40 +430,115 @@ string Coordinator::FilterDebugString() {
   return Substitute("\n$0", table_printer.ToString());
 }
 
-Status Coordinator::GetStatus() {
-  lock_guard<mutex> l(lock_);
-  return query_status_;
+const char* Coordinator::ExecStateToString(const ExecState state) {
+  static const unordered_map<ExecState, const char *> exec_state_to_str{
+    {ExecState::EXECUTING,        "EXECUTING"},
+    {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
+    {ExecState::CANCELLED,        "CANCELLED"},
+    {ExecState::ERROR,            "ERROR"}};
+  return exec_state_to_str.at(state);
 }
 
-Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
-    bool is_fragment_failure, const TUniqueId& instance_id) {
+Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
+  DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED);
+  Status ret_status;
   {
-    lock_guard<mutex> l(lock_);
-
-    // The query is done and we are just waiting for backends to clean up.
-    // Ignore their cancelled updates.
-    if (returned_all_results_ && status.IsCancelled()) return query_status_;
-
-    // nothing to update
-    if (status.ok()) return query_status_;
-
-    // don't override an error status; also, cancellation has already started
-    if (!query_status_.ok()) return query_status_;
-
-    query_status_ = status;
-    CancelInternal();
-  }
-
-  if (is_fragment_failure) {
-    // Log the id of the fragment that first failed so we can track it down more easily.
-    VLOG_QUERY << "query_id=" << PrintId(query_id())
-               << " failed because fragment_instance_id=" << PrintId(instance_id)
-               << " on host=" << backend_hostname << " failed.";
+    lock_guard<SpinLock> l(exec_state_lock_);
+    // May have already entered a terminal state, in which case nothing to do.
+    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
+    DCHECK(exec_status_.ok()) << exec_status_;
+    exec_state_ = state;
+    if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
+    ret_status = exec_status_;
+  }
+  VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()),
+      state == ExecState::CANCELLED ? "cancelled" : "completed");
+  HandleExecStateTransition(ExecState::EXECUTING, state);
+  return ret_status;
+}
+
+Status Coordinator::UpdateExecState(const Status& status,
+    const TUniqueId* failed_finst, const string& instance_hostname) {
+  Status ret_status;
+  ExecState old_state, new_state;
+  {
+    lock_guard<SpinLock> l(exec_state_lock_);
+    old_state = exec_state_;
+    if (old_state == ExecState::EXECUTING) {
+      DCHECK(exec_status_.ok()) << exec_status_;
+      if (!status.ok()) {
+        // Error while executing - go to ERROR state.
+        exec_status_ = status;
+        exec_state_ = ExecState::ERROR;
+      }
+    } else if (old_state == ExecState::RETURNED_RESULTS) {
+      // Already returned all results. Leave exec status as ok, stay in this state.
+      DCHECK(exec_status_.ok()) << exec_status_;
+    } else if (old_state == ExecState::CANCELLED) {
+      // Client requested cancellation already, stay in this state.  Ignores errors
+      // after requested cancellations.
+      DCHECK(exec_status_.IsCancelled()) << exec_status_;
+    } else {
+      // Already in the ERROR state, stay in this state but update status to be the
+      // first non-cancelled status.
+      DCHECK_EQ(old_state, ExecState::ERROR);
+      DCHECK(!exec_status_.ok());
+      if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) {
+        exec_status_ = status;
+      }
+    }
+    new_state = exec_state_;
+    ret_status = exec_status_;
+  }
+  // Log interesting status: a non-cancelled error or a cancellation if was executing.
+  if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) {
+    VLOG_QUERY << Substitute(
+        "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
+        PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A",
+        instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state),
+        status.GetDetail());
+  }
+  // After dropping the lock, apply the state transition (if any) side-effects.
+  HandleExecStateTransition(old_state, new_state);
+  return ret_status;
+}
+
+bool Coordinator::ReturnedAllResults() {
+  lock_guard<SpinLock> l(exec_state_lock_);
+  return exec_state_ == ExecState::RETURNED_RESULTS;
+}
+
+void Coordinator::HandleExecStateTransition(
+    const ExecState old_state, const ExecState new_state) {
+  static const unordered_map<ExecState, const char *> exec_state_to_event{
+    {ExecState::EXECUTING,        "Executing"},
+    {ExecState::RETURNED_RESULTS, "Last row fetched"},
+    {ExecState::CANCELLED,        "Execution cancelled"},
+    {ExecState::ERROR,            "Execution error"}};
+  if (old_state == new_state) return;
+  // Once we enter a terminal state, we stay there, guaranteeing this code runs only once.
+  DCHECK_EQ(old_state, ExecState::EXECUTING);
+  // Should never transition to the initial state.
+  DCHECK_NE(new_state, ExecState::EXECUTING);
+
+  query_events_->MarkEvent(exec_state_to_event.at(new_state));
+  // TODO: IMPALA-7011 is this needed? This will also happen on the "backend" side of
+  // cancel rpc. And in the case of EOS, sink already knows this.
+  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
+  // This thread won the race to transitioning into a terminal state - terminate
+  // execution and release resources.
+  ReleaseExecResources();
+  if (new_state == ExecState::RETURNED_RESULTS) {
+    // TODO: IMPALA-6984: cancel all backends in this case too.
+    WaitForBackends();
   } else {
-    VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error on host="
-               << backend_hostname;
+    CancelBackends();
   }
-  return query_status_;
+  ReleaseAdmissionControlResources();
+  // Can compute summary only after we stop accepting reports from the backends. Both
+  // WaitForBackends() and CancelBackends() ensures that.
+  // TODO: should move this off of the query execution path?
+  ComputeQuerySummary();
 }
 
 Status Coordinator::FinalizeHdfsInsert() {
@@ -491,7 +550,7 @@ Status Coordinator::FinalizeHdfsInsert() {
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
-  Status return_status = GetStatus();
+  Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
   if (return_status.ok()) {
     HdfsTableDescriptor* hdfs_table;
     RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
@@ -517,22 +576,13 @@ Status Coordinator::FinalizeHdfsInsert() {
   return return_status;
 }
 
-Status Coordinator::WaitForBackendCompletion() {
-  unique_lock<mutex> l(lock_);
-  while (num_remaining_backends_ > 0 && query_status_.ok()) {
-    VLOG_QUERY << "Coordinator waiting for backends to finish, "
-               << num_remaining_backends_ << " remaining. query_id="
-               << PrintId(query_id());
-    backend_completion_cv_.Wait(l);
+void Coordinator::WaitForBackends() {
+  int32_t num_remaining = backend_exec_complete_barrier_->pending();
+  if (num_remaining > 0) {
+    VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining
+               << " remaining. query_id=" << PrintId(query_id());
+    backend_exec_complete_barrier_->Wait();
   }
-  if (query_status_.ok()) {
-    VLOG_QUERY << "All backends finished successfully. query_id=" << PrintId(query_id());
-  } else {
-    VLOG_QUERY << "All backends finished due to one or more errors. query_id="
-               << PrintId(query_id()) << ". " << query_status_.GetDetail();
-  }
-
-  return query_status_;
 }
 
 Status Coordinator::Wait() {
@@ -543,34 +593,22 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
-        runtime_state()->fragment_instance_id());
+    return UpdateExecState(coord_instance_->WaitForOpen(),
+        &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname);
   }
-
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // Query finalization can only happen when all backends have reported relevant
-  // state. They only have relevant state to report in the parallel INSERT case,
-  // otherwise all the relevant state is from the coordinator fragment which will be
-  // available after Open() returns.  Ignore the returned status if finalization is
-  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
-  // regardless.
-  Status status = WaitForBackendCompletion();
-  if (finalize_params() == nullptr && !status.ok()) return status;
-
-  // Execution of query fragments has finished. We don't need to hold onto query execution
-  // resources while we finalize the query.
-  ReleaseExecResources();
-  // Query finalization is required only for HDFS table sinks
-  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
-  // Release admission control resources after we'd done the potentially heavyweight
-  // finalization.
-  ReleaseAdmissionControlResources();
-
+  // DML finalization can only happen when all backends have completed all side-effects
+  // and reported relevant state.
+  WaitForBackends();
+  if (finalize_params() != nullptr) {
+    RETURN_IF_ERROR(UpdateExecState(
+            FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
+  }
+  // DML requests are finished at this point.
+  RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   query_profile_->AddInfoString(
       "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
-  // For DML queries, when Wait is done, the query is complete.
-  ComputeQuerySummary();
-  return status;
+  return Status::OK();
 }
 
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
@@ -578,88 +616,54 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  if (returned_all_results_) {
-    // May be called after the first time we set *eos. Re-set *eos and return here;
-    // already torn-down coord_sink_ so no more work to do.
+  // exec_state_lock_ not needed here though since this path won't execute concurrently
+  // with itself or DML finalization.
+  if (exec_state_ == ExecState::RETURNED_RESULTS) {
+    // Nothing left to do: already in a terminal state and no more results.
     *eos = true;
     return Status::OK();
   }
+  DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
+  DCHECK(coord_sink_ != nullptr)     << "Exec() should be called first";
+  RuntimeState* runtime_state = coord_instance_->runtime_state();
 
-  DCHECK(coord_sink_ != nullptr)
-      << "GetNext() called without result sink. Perhaps Prepare() failed and was not "
-      << "checked?";
-  Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
-
-  // if there was an error, we need to return the query's error status rather than
-  // the status we just got back from the local executor (which may well be CANCELLED
-  // in that case).  Coordinator fragment failed in this case so we log the query_id.
-  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
-      runtime_state()->fragment_instance_id()));
-
-  if (*eos) {
-    returned_all_results_ = true;
-    query_events_->MarkEvent("Last row fetched");
-    // release query execution resources here, since we won't be fetching more result rows
-    ReleaseExecResources();
-    // wait for all backends to complete before computing the summary
-    // TODO: relocate this so GetNext() won't have to wait for backends to complete?
-    RETURN_IF_ERROR(WaitForBackendCompletion());
-    // Release admission control resources after backends are finished.
-    ReleaseAdmissionControlResources();
-    // if the query completed successfully, compute the summary
-    if (query_status_.ok()) ComputeQuerySummary();
-  }
-
+  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
+  RETURN_IF_ERROR(UpdateExecState(
+          status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
+  if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
   return Status::OK();
 }
 
-void Coordinator::Cancel(const Status* cause) {
-  lock_guard<mutex> l(lock_);
-  // if the query status indicates an error, cancellation has already been initiated;
-  // prevent others from cancelling a second time
-  if (!query_status_.ok()) return;
-
-  // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers
-  // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
-  // of a successful query. Need to clean up relationship between query_status_ here and
-  // in QueryExecState. See IMPALA-4279.
-  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
-  CancelInternal();
+void Coordinator::Cancel() {
+  // Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel
+  // RPC passing the exec RPC.
+  DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
+      exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called first";
+  discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
 }
 
-void Coordinator::CancelInternal() {
-  VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id());
-  // TODO: remove when restructuring cancellation, which should happen automatically
-  // as soon as the coordinator knows that the query is finished
-  DCHECK(!query_status_.ok());
-
+void Coordinator::CancelBackends() {
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
     DCHECK(backend_state != nullptr);
     if (backend_state->Cancel()) ++num_cancelled;
   }
+  backend_exec_complete_barrier_->NotifyRemaining();
+
   VLOG_QUERY << Substitute(
       "CancelBackends() query_id=$0, tried to cancel $1 backends",
       PrintId(query_id()), num_cancelled);
-  backend_completion_cv_.NotifyAll();
-
-  ReleaseExecResourcesLocked();
-  ReleaseAdmissionControlResourcesLocked();
-  // Report the summary with whatever progress the query made before being cancelled.
-  ComputeQuerySummary();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
-  VLOG_FILE << "UpdateBackendExecStatus()  backend_idx=" << params.coord_state_idx;
+  VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
+            << " backend_idx=" << params.coord_state_idx;
   if (params.coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown backend index $0 (max known: $1)",
             params.coord_state_idx, backend_states_.size() - 1));
   }
   BackendState* backend_state = backend_states_[params.coord_state_idx];
-  // TODO: return here if returned_all_results_?
-  // TODO: return CANCELLED in that case? Although that makes the cancellation propagation
-  // path more irregular.
 
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
@@ -668,46 +672,30 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
-    // This report made this backend done, so update the status and
-    // num_remaining_backends_.
-
-    // for now, abort the query if we see any error except if returned_all_results_ is
-    // true (UpdateStatus() initiates cancellation, if it hasn't already been)
-    // TODO: clarify control flow here, it's unclear we should even process this status
-    // report if returned_all_results_ is true
+    // This backend execution has completed.
     bool is_fragment_failure;
     TUniqueId failed_instance_id;
     Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
-    if (!status.ok() && !returned_all_results_) {
-      Status ignored =
-          UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
-              is_fragment_failure, failed_instance_id);
-      return Status::OK();
-    }
-
-    lock_guard<mutex> l(lock_);
-    DCHECK_GT(num_remaining_backends_, 0);
-    if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
-      VLOG_QUERY << "Backend completed: "
-          << " host=" << TNetworkAddressToString(backend_state->impalad_address())
-          << " remaining=" << num_remaining_backends_ - 1
-          << " query_id=" << PrintId(query_id());
+    int pending_backends = backend_exec_complete_barrier_->Notify();
+    if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
+      VLOG_QUERY << "Backend completed:"
+                 << " host=" << TNetworkAddressToString(backend_state->impalad_address())
+                 << " remaining=" << pending_backends
+                 << " query_id=" << PrintId(query_id());
       BackendState::LogFirstInProgress(backend_states_);
     }
-    if (--num_remaining_backends_ == 0 || !status.ok()) {
-      backend_completion_cv_.NotifyAll();
+    if (!status.ok()) {
+      // TODO: IMPALA-5119: call UpdateExecState() asynchronously rather than
+      // from within this RPC handler (since it can make RPCs).
+      discard_result(UpdateExecState(status,
+              is_fragment_failure ? &failed_instance_id : nullptr,
+              TNetworkAddressToString(backend_state->impalad_address())));
     }
-    return Status::OK();
   }
   // If all results have been returned, return a cancelled status to force the fragment
   // instance to stop executing.
-  if (returned_all_results_) return Status::CANCELLED;
-
-  return Status::OK();
-}
-
-RuntimeState* Coordinator::runtime_state() {
-  return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
+  // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
+  return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
 }
 
 // TODO: add histogram/percentile
@@ -740,20 +728,14 @@ void Coordinator::ComputeQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  for (BackendState* state: backend_states_) {
-    state->MergeErrorLog(&merged);
+  {
+    lock_guard<SpinLock> l(backend_states_init_lock_);
+    for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
   }
   return PrintErrorMapToString(merged);
 }
 
 void Coordinator::ReleaseExecResources() {
-  lock_guard<mutex> l(lock_);
-  ReleaseExecResourcesLocked();
-}
-
-void Coordinator::ReleaseExecResourcesLocked() {
-  if (released_exec_resources_) return;
-  released_exec_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -767,8 +749,6 @@ void Coordinator::ReleaseExecResourcesLocked() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
-  // Need to protect against failed Prepare(), where root_sink() would not be set.
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // Now that we've released our own resources, can release query-wide resources.
   if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -776,27 +756,20 @@ void Coordinator::ReleaseExecResourcesLocked() {
 }
 
 void Coordinator::ReleaseAdmissionControlResources() {
-  lock_guard<mutex> l(lock_);
-  ReleaseAdmissionControlResourcesLocked();
-}
-
-void Coordinator::ReleaseAdmissionControlResourcesLocked() {
-  if (released_admission_control_resources_) return;
-  LOG(INFO) << "Release admission control resources for query_id="
-            << PrintId(query_ctx().query_id);
+  LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
-  released_admission_control_resources_ = true;
   query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
-  DCHECK(exec_complete_barrier_.get() != nullptr)
+  DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
-  exec_complete_barrier_->Wait();
+
+  exec_rpcs_complete_barrier_->Wait();
   DCHECK(filter_routing_table_complete_)
       << "Filter received before routing table complete";
 
@@ -867,6 +840,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   rpc_params.__set_dst_query_id(query_id());
   rpc_params.__set_filter_id(params.filter_id);
 
+  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       rpc_params.__set_dst_fragment_idx(fragment_idx);
@@ -940,23 +914,19 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
   }
 }
 
-const TUniqueId& Coordinator::query_id() const {
-  return query_ctx().query_id;
-}
-
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
 }
 
 MemTracker* Coordinator::query_mem_tracker() const {
-  return query_state()->query_mem_tracker();
+  return query_state_->query_mem_tracker();
 }
 
 void Coordinator::BackendsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<mutex> l(lock_);
+    lock_guard<SpinLock> l(backend_states_init_lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->ToJson(&val, doc);
@@ -969,7 +939,7 @@ void Coordinator::BackendsToJson(Document* doc) {
 void Coordinator::FInstanceStatsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<mutex> l(lock_);
+    lock_guard<SpinLock> l(backend_states_init_lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->InstanceStatsToJson(&val, doc);
@@ -979,6 +949,14 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
 
+const TQueryCtx& Coordinator::query_ctx() const {
+  return schedule_.request().query_ctx;
+}
+
+const TUniqueId& Coordinator::query_id() const {
+  return query_ctx().query_id;
+}
+
 const TFinalizeParams* Coordinator::finalize_params() const {
   return schedule_.request().__isset.finalize_params
       ? &schedule_.request().finalize_params : nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 8e556ec..a0b9b4c 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -20,29 +20,19 @@
 
 #include <string>
 #include <vector>
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
 #include <rapidjson/document.h>
 
 #include "common/global-types.h"
-#include "common/hdfs.h"
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
-#include "runtime/query-state.h"
-#include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
+#include "util/runtime-profile-counters.h"
 
 namespace impala {
 
@@ -55,6 +45,7 @@ class TPlanExecRequest;
 class TRuntimeProfileTree;
 class RuntimeProfile;
 class QueryResultSet;
+class QuerySchedule;
 class MemTracker;
 class PlanRootSink;
 class FragmentInstanceState;
@@ -64,10 +55,9 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
 /// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation. Once a query ends, either through cancellation or
-/// by returning eos, the coordinator releases resources. (Note that DML requests
-/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
-/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
+/// query, including cancellation. Once a query ends, either by returning EOS, through
+/// client cancellation, returning an error, or by finalizing a DML request, the
+/// coordinator releases resources.
 ///
 /// The coordinator monitors the execution status of fragment instances and aborts the
 /// entire query if an error is reported by any of them.
@@ -76,80 +66,80 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same machine as
 /// the coordinator.
 ///
-/// Thread-safe, with the exception of GetNext().
-//
+/// Thread-safe except where noted.
+///
 /// A typical sequence of calls for a single query (calls under the same numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
 /// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
 /// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
 ///
-/// The implementation ensures that setting an overall error status and initiating
-/// cancellation of all fragment instances is atomic.
+/// A query is considered to be executing until one of three things occurs:
+/// 1. An error is encountered. Backend cancellation is automatically initiated for all
+///    backends that haven't yet completed and the overall query status is set to the
+///    first (non-cancelled) encountered error status.
+/// 2. The query is cancelled via an explicit Cancel() call. The overall query status
+///    is set to CANCELLED and cancellation is initiated for all backends still
+///    executing (without an error status).
+/// 3. The query has returned all rows. The overall query status is OK (and remains
+///    OK). Client cancellation is no longer possible and subsequent backend errors are
+///    ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case).
+///
+/// Lifecycle: this object must not be destroyed until after one of the three states
+/// above is reached (error, cancelled, or EOS) to ensure resources are released.
+///
+/// Lock ordering: (lower-numbered acquired before higher-numbered)
+/// 1. wait_lock_
+/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
+///    ExecSummary::lock (leafs)
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
-/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
-/// TODO: clarify cancellation path; in particular, cancel as soon as we return
-/// all results
 class Coordinator { // NOLINT: The member variables could be re-ordered to save space
  public:
   Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
-  /// Initiate asynchronous execution of a query with the given schedule. When it returns,
-  /// all fragment instances have started executing at their respective backends.
-  /// A call to Exec() must precede all other member function calls.
+  /// Initiate asynchronous execution of a query with the given schedule. When it
+  /// returns, all fragment instances have started executing at their respective
+  /// backends. Exec() must be called exactly once and a call to Exec() must precede
+  /// all other member function calls.
   Status Exec() WARN_UNUSED_RESULT;
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the
-  /// query doesn't return rows, until the query finishes or is cancelled.
-  /// A call to Wait() must precede all calls to GetNext().
-  /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
-  /// Wait() calls concurrently.
+  /// query doesn't return rows, until the query finishes or is cancelled. A call to
+  /// Wait() must precede all calls to GetNext().  Multiple calls to Wait() are
+  /// idempotent and it is okay to issue multiple Wait() calls concurrently.
   Status Wait() WARN_UNUSED_RESULT;
 
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows'
-  /// rows, but will not return more.
-  ///
-  /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a
-  /// no-op.
-  ///
-  /// GetNext() will not set *eos=true until all fragment instances have either completed
-  /// or have failed.
-  ///
-  /// Returns an error status if an error was encountered either locally or by any of the
-  /// remote fragments or if the query was cancelled.
+  /// rows, but will not return more. If *eos is true, all rows have been returned.
+  /// Returns a non-OK status if an error was encountered either locally or by any of
+  /// the executing backends, or if the query was cancelled via Cancel().  After *eos
+  /// is true, subsequent calls to GetNext() will be a no-op.
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext()
-  /// calls (but may call any of the other member functions concurrently with GetNext()).
+  /// calls.
   Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT;
 
-  /// Cancel execution of query. This includes the execution of the local plan fragment,
-  /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
-  /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
-  /// Idempotent.
-  void Cancel(const Status* cause = nullptr);
+  /// Cancel execution of query and sets the overall query status to CANCELLED if the
+  /// query is still executing. Idempotent.
+  void Cancel();
 
-  /// Updates execution status of a particular backend as well as dml_exec_state_.
-  /// Also updates num_remaining_backends_ and cancels execution if the backend has an
-  /// error status.
+  /// Called by the report status RPC handler to update execution status of a
+  /// particular backend as well as dml_exec_state_ and the profile.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
-  /// Only valid to call after Exec().
-  QueryState* query_state() const { return query_state_; }
-
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
   RuntimeProfile* query_profile() const { return query_profile_; }
 
-  const TUniqueId& query_id() const;
-
+  /// Safe to call only after Exec().
   MemTracker* query_mem_tracker() const;
 
-  /// This is safe to call only after Wait()
+  /// Safe to call only after Wait().
   DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages from the
@@ -158,9 +148,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   const ProgressUpdater& progress() { return progress_; }
 
-  /// Returns query_status_.
-  Status GetStatus();
-
   /// Get a copy of the current exec summary. Thread-safe.
   void GetTExecSummary(TExecSummary* exec_summary);
 
@@ -187,18 +174,20 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// owned by the ClientRequestState that owns this coordinator
   const QuerySchedule& schedule_;
 
-  /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
+  /// Copied from TQueryExecRequest, governs when finalization occurs. Set in Exec().
   TStmtType::type stmt_type_;
 
-  /// BackendStates for all execution backends, including the coordinator.
-  /// All elements are non-nullptr. Owned by obj_pool(). Populated by
-  /// InitBackendExec().
+  /// BackendStates for all execution backends, including the coordinator. All elements
+  /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
   std::vector<BackendState*> backend_states_;
 
-  // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment
-  int coord_backend_idx_ = -1;
+  /// Protects the population of backend_states_ vector (not the BackendState objects).
+  /// Used when accessing backend_states_ if it's not guaranteed that
+  /// InitBackendStates() has completed.
+  SpinLock backend_states_init_lock_;
 
-  /// The QueryState for this coordinator. Set in Exec(). Released in TearDown().
+  /// The QueryState for this coordinator. Reference taken in Exec(). Reference
+  /// released in destructor.
   QueryState* query_state_ = nullptr;
 
   /// Non-null if and only if the query produces results for the client; i.e. is of
@@ -209,22 +198,28 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
-  /// reference of QueryState released) in TearDown().
+  /// Owned by the QueryState. Set in Exec().
   FragmentInstanceState* coord_instance_ = nullptr;
 
-  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
-  /// GetNext() hits eos.
+  /// Owned by the QueryState. Set in Exec().
   PlanRootSink* coord_sink_ = nullptr;
 
-  /// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this
+  /// ensures single-threaded execution of Wait(). See lock ordering class comment.
   boost::mutex wait_lock_;
 
   bool has_called_wait_ = false;  // if true, Wait() was called; protected by wait_lock_
 
-  /// Keeps track of number of completed ranges and total scan ranges.
+  /// Keeps track of number of completed ranges and total scan ranges. Initialized by
+  /// Exec().
   ProgressUpdater progress_;
 
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
+  RuntimeProfile* query_profile_ = nullptr;
+
+  /// Total time spent in finalization (typically 0 except for INSERT into hdfs
+  /// tables). Set in Exec().
+  RuntimeProfile::Counter* finalization_timer_ = nullptr;
+
   /// Total number of filter updates received (always 0 if filter mode is not
   /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
   RuntimeProfile::Counter* filter_updates_received_ = nullptr;
@@ -255,6 +250,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
     void Init(const QuerySchedule& query_schedule);
   };
 
+  // Initialized by Exec().
   ExecSummary exec_summary_;
 
   /// Filled in as the query completes and tracks the results of DML queries.  This is
@@ -262,52 +258,40 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// coordinator fragment: only one of the two can legitimately produce updates.
   DmlExecState dml_exec_state_;
 
-  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
-  RuntimeProfile* query_profile_ = nullptr;
-
-  /// Protects all fields below. This is held while making RPCs, so this lock should
-  /// only be acquired if the acquiring thread is prepared to wait for a significant
-  /// time.
-  /// TODO: clarify to what extent the fields below need to be protected by lock_
-  /// Lock ordering is
-  /// 1. wait_lock_
-  /// 2. lock_
-  /// 3. BackendState::lock_
-  /// 4. filter_lock_
-  boost::mutex lock_;
-
-  /// Overall status of the entire query; set to the first reported fragment error
-  /// status or to CANCELLED, if Cancel() is called.
-  Status query_status_;
-
-  /// If true, the query is done returning all results.  It is possible that the
-  /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries
-  /// with limit)
-  /// Once this is set to true, errors from execution backends are ignored.
-  bool returned_all_results_ = false;
-
-  /// If there is no coordinator fragment, Wait() simply waits until all
-  /// backends report completion by notifying on backend_completion_cv_.
-  /// Tied to lock_.
-  ConditionVariable backend_completion_cv_;
-
-  /// Count of the number of backends for which done != true. When this
-  /// hits 0, any Wait()'ing thread is notified
-  int num_remaining_backends_ = 0;
-
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
-  /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(),
-  /// elements live in obj_pool().
+  /// Indexed by fragment idx (TPlanFragment.idx). Filled in
+  /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by BackendState
+  /// sequentially, without synchronization.
   std::vector<FragmentStats*> fragment_stats_;
 
-  /// total time spent in finalization (typically 0 except for INSERT into hdfs tables)
-  RuntimeProfile::Counter* finalization_timer_ = nullptr;
+  /// Barrier that is released when all calls to BackendState::Exec() have
+  /// returned. Initialized in StartBackendExec().
+  boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_;
+
+  /// Barrier that is released when all backends have indicated execution completion,
+  /// or when all backends are cancelled due to an execution error or client requested
+  /// cancellation. Initialized in StartBackendExec().
+  boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
+
+  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
 
-  /// Barrier that is released when all calls to ExecRemoteFragment() have
-  /// returned, successfully or not. Initialised during Exec().
-  boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
+  /// EXECUTING: in-flight; the only non-terminal state
+  /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
+  /// CANCELLED: Cancel() was called (not: someone called CancelBackends())
+  /// ERROR: received an error from a backend
+  enum class ExecState {
+    EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
+  };
+  ExecState exec_state_ = ExecState::EXECUTING;
+
+  /// Overall execution status; only set on exec_state_ transitions:
+  /// - EXECUTING: OK
+  /// - RETURNED_RESULTS: OK
+  /// - CANCELLED: CANCELLED
+  /// - ERROR: error status
+  Status exec_status_;
 
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
@@ -320,12 +304,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if ReleaseExecResources() has been called.
-  bool released_exec_resources_ = false;
-
-  /// True if and only if ReleaseAdmissionControlResources() has been called.
-  bool released_admission_control_resources_ = false;
-
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -333,36 +311,67 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// HDFS INSERT finalization is not required.
   const TFinalizeParams* finalize_params() const;
 
-  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
+  const TQueryCtx& query_ctx() const;
 
-  /// Only valid *after* calling Exec(). Return nullptr if the running query does not
-  /// produce any rows.
-  RuntimeState* runtime_state();
+  const TUniqueId& query_id() const;
 
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
-  /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when
-  /// the query is not being cancelled in the case where the query limit is reached.
-  void CancelInternal();
-
-  /// Acquires lock_ and updates query_status_ with 'status' if it's not already
-  /// an error status, and returns the current query_status_. The status may be
-  /// due to an error in a specific fragment instance, or it can be a general error
-  /// not tied to a specific fragment instance.
-  /// Calls CancelInternal() when switching to an error status.
-  /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
-  /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
-  /// reporting. For a general error not tied to a specific instance,
-  /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
-  /// 'backend_hostname' is used for error reporting in either case.
-  Status UpdateStatus(const Status& status, const std::string& backend_hostname,
-      bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
-
-  /// Returns only when either all execution backends have reported success or the query
-  /// is in error. Returns the status of the query.
-  /// It is safe to call this concurrently, but any calls must be made only after Exec().
-  Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
+  /// Called when the query is done executing due to reaching EOS or client
+  /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets
+  /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), and
+  /// finalizes execution (cancels remaining backends if transitioning to CANCELLED;
+  /// either way, calls ComputeQuerySummary() and releases resources). Returns the
+  /// resulting overall execution status.
+  Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT;
+
+  /// Transitions 'exec_state_' given an execution status and returns the resulting
+  /// overall status:
+  ///
+  /// - if the 'status' parameter is ok, no state transition
+  /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to ERROR
+  /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: does not
+  ///   transition state (those are terminal states) however in the case of ERROR,
+  ///   status may be updated to a more interesting status.
+  ///
+  /// Should not be called for (client initiated) cancellation. Call
+  /// SetNonErrorTerminalState(CANCELLED) instead.
+  ///
+  /// 'failed_finstance' is the fragment instance id that has failed (or nullptr if the
+  /// failure is not specific to a fragment instance), used for error reporting along
+  /// with 'instance_hostname'.
+  Status UpdateExecState(const Status& status, const TUniqueId* failed_finstance,
+      const string& instance_hostname) WARN_UNUSED_RESULT;
+
+  /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If the caller
+  /// transitioned to a terminal state (which happens exactly once for the lifetime of
+  /// the Coordinator object), then finalizes execution (cancels remaining backends if
+  /// transitioning to CANCELLED; in all cases releases resources and calls
+  /// ComputeQuerySummary().
+  void HandleExecStateTransition(const ExecState old_state, const ExecState new_state);
+
+  /// Return true if 'exec_state_' is RETURNED_RESULTS.
+  /// TODO: remove with IMPALA-6984.
+  bool ReturnedAllResults() WARN_UNUSED_RESULT;
+
+  /// Return the string representation of 'state'.
+  static const char* ExecStateToString(const ExecState state);
+
+  // For DCHECK_EQ, etc of ExecState values.
+  friend std::ostream& operator<<(std::ostream& o, const ExecState s) {
+    return o << ExecStateToString(s);
+  }
+
+  /// Helper for HandleExecStateTransition(). Sends cancellation request to all
+  /// executing backends but does not wait for acknowledgement from the backends. The
+  /// ExecState state-machine ensures this is called at most once.
+  void CancelBackends();
+
+  /// Returns only when either all execution backends have reported success or a request
+  /// to cancel the backends has already been sent. It is safe to call this concurrently,
+  /// but any calls must be made only after Exec().
+  void WaitForBackends();
 
   /// Initializes fragment_stats_ and query_profile_. Must be called before
   /// InitBackendStates().
@@ -384,36 +393,33 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// finishing the INSERT in flight.
   Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
-  /// Populates backend_states_, starts query execution at all backends in parallel, and
-  /// blocks until startup completes.
+  /// Helper for Exec(). Populates backend_states_, starts query execution at all
+  /// backends in parallel, and blocks until startup completes.
   void StartBackendExec();
 
-  /// Calls CancelInternal() and returns an error if there was any error starting
-  /// backend execution.
-  /// Also updates query_profile_ with the startup latency histogram.
+  /// Helper for Exec(). Checks for errors encountered when starting backend execution,
+  /// using any non-OK status, if any, as the overall status. Returns the overall
+  /// status. Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
 
-  /// Releases all resources associated with query execution. Acquires lock_. Idempotent.
+  /// Helper for HandleExecStateTransition(). Releases all resources associated with
+  /// query execution. The ExecState state-machine ensures this is called exactly once.
   void ReleaseExecResources();
 
-  /// Same as ReleaseExecResources() except the lock must be held by the caller.
-  void ReleaseExecResourcesLocked();
-
-  /// Releases admission control resources for use by other queries.
-  /// This should only be called if one of following preconditions is satisfied for each
-  /// backend on which the query is executing:
-  /// * The backend finished execution.
-  ///   Rationale: the backend isn't consuming resources.
-  //
+  /// Helper for HandleExecStateTransition(). Releases admission control resources for
+  /// use by other queries. This should only be called if one of following
+  /// preconditions is satisfied for each backend on which the query is executing:
+  ///
+  /// * The backend finished execution.  Rationale: the backend isn't consuming
+  ///   resources.
   /// * A cancellation RPC was delivered to the backend.
   ///   Rationale: the backend will be cancelled and release resources soon. By the
   ///   time a newly admitted query fragment starts up on the backend and starts consuming
   ///   resources, the resources from this query will probably have been released.
-  //
   /// * Sending the cancellation RPC to the backend failed
   ///   Rationale: the backend is either down or will tear itself down when it next tries
   ///   to send a status RPC to the coordinator. It's possible that the fragment will be
@@ -424,16 +430,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   ///   pessimistically queueing queries while we wait for a response from a backend that
   ///   may never come.
   ///
-  /// Calling WaitForBackendCompletion() or CancelInternal() before this function is
-  /// sufficient to satisfy the above preconditions. If the query has an expensive
-  /// finalization step post query execution (e.g. a DML statement), then this should
-  /// be called after that completes to avoid over-admitting queries.
+  /// Calling WaitForBackends() or CancelBackends() before this function is sufficient
+  /// to satisfy the above preconditions. If the query has an expensive finalization
+  /// step post query execution (e.g. a DML statement), then this should be called
+  /// after that completes to avoid over-admitting queries.
   ///
-  /// Acquires lock_. Idempotent.
+  /// The ExecState state-machine ensures this is called exactly once.
   void ReleaseAdmissionControlResources();
-
-  /// Same as ReleaseAdmissionControlResources() except lock must be held by caller.
-  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 12b9b78..2ca1256 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
 
   // Cancel the parent query. 'lock_' should not be held because cancellation involves
   // RPCs and can block for a long time.
-  if (coord != NULL) coord->Cancel(cause);
+  if (coord != NULL) coord->Cancel();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index fb3f261..3af4c9b 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -111,11 +111,6 @@ class ClientRequestState;
 /// 6. ClientRequestState::expiration_data_lock_
 /// 7. Coordinator::exec_summary_lock
 ///
-/// Coordinator::lock_ should not be acquired at the same time as the
-/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
-/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of
-/// the above lock ordering.
-///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
 /// * session_timeout_lock_

http://git-wip-us.apache.org/repos/asf/impala/blob/da329442/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 49b0bde..827c526 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -33,8 +33,23 @@ class CountingBarrier {
   }
 
   /// Sends one notification, decrementing the number of pending notifications by one.
-  void Notify() {
-    if (count_.Add(-1) == 0) promise_.Set(true);
+  /// Returns the remaining pending notifications.
+  int32_t Notify() {
+    int32_t result = count_.Add(-1);
+    if (result == 0) promise_.Set(true);
+    return result;
+  }
+
+  /// Sets the number of pending notifications to 0 and unblocks Wait().
+  void NotifyRemaining() {
+    while (true) {
+      int32_t value = count_.Load();
+      if (value <= 0) return;  // count_ can legitimately drop below 0
+      if (count_.CompareAndSwap(value, 0)) {
+        promise_.Set(true);
+        return;
+      }
+    }
   }
 
   /// Blocks until all notifications are received.
@@ -44,6 +59,8 @@ class CountingBarrier {
   /// case '*timed_out' will be true.
   void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); }
 
+  int32_t pending() const { return count_.Load(); }
+
  private:
   /// Used to signal waiters when all notifications are received.
   Promise<bool> promise_;