You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/05/16 15:21:51 UTC

[1/3] impala git commit: IMPALA-6645: Enable disk spill encryption by default

Repository: impala
Updated Branches:
  refs/heads/2.x 0ce9056a1 -> bee907c5f


IMPALA-6645: Enable disk spill encryption by default

Perf:
Targeted benchmarks with a heavily spilling query on a machine
with PCLMULQDQ support show < 5% of CPU time spent in encryption and
decryption. PCLMULQDQ was introduced in AMD Bulldozer (c. 2011)
and Intel Westmere (c. 2010).

Testing:
Ran core tests with the change.

Updated the custom cluster test to exercise the non-default
configuration.

Change-Id: Iee4be2a95d689f66c3663d99e4df0fb3968893a9
Reviewed-on: http://gerrit.cloudera.org:8080/10345
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e4fcc31c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e4fcc31c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e4fcc31c

Branch: refs/heads/2.x
Commit: e4fcc31cf1385fa99c06a04c0c4edecfbf5dbd95
Parents: 0ce9056
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 11 11:00:35 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 16 00:53:41 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/tmp-file-mgr.cc                  |  2 +-
 .../queries/QueryTest/basic-spilling.test       | 16 +++++++++
 .../QueryTest/disk-spill-encryption.test        | 15 ---------
 .../test_disk_spill_configurations.py           | 34 ++++++++++++++++++++
 .../test_disk_spill_encryption.py               | 32 ------------------
 5 files changed, 51 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 04e15d4..b995518 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -38,7 +38,7 @@
 
 #include "common/names.h"
 
-DEFINE_bool(disk_spill_encryption, false,
+DEFINE_bool(disk_spill_encryption, true,
     "Set this to encrypt and perform an integrity "
     "check on all data spilled to disk during a query");
 DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");

http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test b/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test
new file mode 100644
index 0000000..513ba2c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# A basic spilling query to exercise spill-to-disk end-to-end.
+set buffer_pool_limit=90m;
+set default_spillable_buffer_size=64k;
+select count(*)
+from (select distinct o_orderdate, o_custkey, o_comment
+      from tpch_parquet.orders) v;
+---- RESULTS
+1500000
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that spilling was activated.
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test b/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test
deleted file mode 100644
index 48649e5..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test
+++ /dev/null
@@ -1,15 +0,0 @@
-====
----- QUERY
-set buffer_pool_limit=90m;
-set default_spillable_buffer_size=64k;
-select count(*)
-from (select distinct o_orderdate, o_custkey, o_comment
-      from tpch_parquet.orders) v;
----- RESULTS
-1500000
----- TYPES
-BIGINT
----- RUNTIME_PROFILE
-# Verify that spilling was activated.
-row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
-====

http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/tests/custom_cluster/test_disk_spill_configurations.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_disk_spill_configurations.py b/tests/custom_cluster/test_disk_spill_configurations.py
new file mode 100644
index 0000000..efddd23
--- /dev/null
+++ b/tests/custom_cluster/test_disk_spill_configurations.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestDiskSpillConfigurations(CustomClusterTestSuite):
+  """Tests to exercise non-default disk spill configurations end-to-end."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--disk_spill_encryption=false")
+  def test_disk_spill_encryption_disabled(self, vector):
+    """Disk spill encryption is enabled by default. We only need a custom cluster to test
+    the non-default configuration."""
+    self.run_test_case('QueryTest/basic-spilling', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/tests/custom_cluster/test_disk_spill_encryption.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_disk_spill_encryption.py b/tests/custom_cluster/test_disk_spill_encryption.py
deleted file mode 100644
index c9a5aeb..0000000
--- a/tests/custom_cluster/test_disk_spill_encryption.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-
-class TestDiskSpillEncryption(CustomClusterTestSuite):
-  """ Tests to exercise disk spill encryption end-to-end. """
-
-  @classmethod
-  def get_workload(self):
-    return 'functional-query'
-
-  @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("--disk_spill_encryption=true")
-  def test_spilling_query(self, vector):
-    self.run_test_case('QueryTest/disk-spill-encryption', vector)


[3/3] impala git commit: IMPALA-7022: TestQueries.test_subquery: Subquery must not return more than one row

Posted by mi...@apache.org.
IMPALA-7022: TestQueries.test_subquery: Subquery must not return more than one row

TestQueries.test_subquery sometimes fails during exhaustive
tests.

In the tests we expect to catch an exception that is
prefixed by the "Query aborted:" string. The prefix is
usually added by impala_beeswax.py::wait_for_completion(),
but in rare cases it isn't added.

>From the point of the test it is irrelevant if the exception
is prefixed with "Query aborted:" or not, so I removed it
from the expected exception string.

Change-Id: I3b8655ad273b1dd7a601099f617db609e4a4797b
Reviewed-on: http://gerrit.cloudera.org:8080/10407
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bee907c5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bee907c5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bee907c5

Branch: refs/heads/2.x
Commit: bee907c5fce6a7f3499b45bd13aa7815ccbb27d8
Parents: 0e6117b
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue May 15 15:47:29 2018 +0200
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 16 10:10:18 2018 +0000

----------------------------------------------------------------------
 .../functional-query/queries/QueryTest/subquery.test         | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bee907c5/testdata/workloads/functional-query/queries/QueryTest/subquery.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/subquery.test b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
index 2d691ed..df1be9a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/subquery.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
@@ -929,7 +929,7 @@ WHERE int_col =
 ORDER BY id
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
 ====
 ---- QUERY
 # Uncorrelated subquery in arithmetic expr that returns multiple rows
@@ -940,7 +940,7 @@ WHERE int_col =
 ORDER BY id
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
 ====
 ---- QUERY
 # Uncorrelated subquery in binary predicate that returns scalar value at runtime
@@ -1000,7 +1000,7 @@ SELECT a FROM (values(1 a),(2),(3)) v
 WHERE a = (SELECT x FROM (values(1 x),(2),(3)) v)
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
 ====
 ---- QUERY
 # Subquery that returns more than one row
@@ -1009,7 +1009,7 @@ SELECT id FROM functional.alltypes
 WHERE id = (SELECT bigint_col FROM functional.alltypes_view)
 ---- RESULTS
 ---- CATCH
-Query aborted:Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
+Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
 ====
 ---- QUERY
 # Runtime scalar subquery with offset.


[2/3] impala git commit: IMPALA-7033/IMPALA-7030: Backout suspected change leading to crash

Posted by mi...@apache.org.
IMPALA-7033/IMPALA-7030: Backout suspected change leading to crash

Revert "IMPALA-5384, part 2: Simplify Coordinator locking and clarify state"

This reverts commit 6ca87e46736a1e591ed7d7d5fee05b4b4d2fbb50.

Change-Id: Idc63006e6e04130b2873a6a9730e434c563327c5
Reviewed-on: http://gerrit.cloudera.org:8080/10412
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10423
Reviewed-by: Dan Hecht <dh...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0e6117be
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0e6117be
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0e6117be

Branch: refs/heads/2.x
Commit: 0e6117be3be8475ea1aa727dec55979cbffd05b7
Parents: e4fcc31c
Author: Dan Hecht <dh...@cloudera.com>
Authored: Tue May 15 11:36:15 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed May 16 09:25:31 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.h |   8 -
 be/src/runtime/coordinator.cc              | 424 +++++++++++++-----------
 be/src/runtime/coordinator.h               | 330 +++++++++---------
 be/src/service/client-request-state.cc     |   2 +-
 be/src/service/impala-server.h             |   5 +
 be/src/util/counting-barrier.h             |  21 +-
 6 files changed, 395 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index e7af2e2..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -21,17 +21,9 @@
 #include <vector>
 #include <unordered_set>
 
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "runtime/coordinator.h"
-#include "scheduling/query-schedule.h"
 #include "util/progress-updater.h"
 #include "util/stopwatch.h"
 #include "util/runtime-profile.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7e12f94..a423de8 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -25,7 +25,6 @@
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
 
-#include "common/hdfs.h"
 #include "exec/data-sink.h"
 #include "exec/plan-root-sink.h"
 #include "gen-cpp/ImpalaInternalService.h"
@@ -40,7 +39,6 @@
 #include "runtime/query-state.h"
 #include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
-#include "scheduling/query-schedule.h"
 #include "util/bloom-filter.h"
 #include "util/counting-barrier.h"
 #include "util/hdfs-bulk-ops.h"
@@ -53,13 +51,16 @@
 
 using namespace apache::thrift;
 using namespace rapidjson;
+using namespace strings;
 using boost::algorithm::iequals;
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
 using boost::algorithm::token_compress_on;
 using boost::algorithm::split;
 using boost::filesystem::path;
+using std::unique_ptr;
 
+DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
 using namespace impala;
@@ -75,9 +76,11 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  // Must have entered a terminal exec state guaranteeing resources were released.
-  DCHECK_NE(exec_state_, ExecState::EXECUTING);
-  // Release the coordinator's reference to the query control structures.
+  DCHECK(released_exec_resources_)
+      << "ReleaseExecResources() must be called before Coordinator is destroyed";
+  DCHECK(released_admission_control_resources_)
+      << "ReleaseAdmissionControlResources() must be called before Coordinator is "
+      << "destroyed";
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -106,6 +109,12 @@ Status Coordinator::Exec() {
   bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
+  // to keep things simple, make async Cancel() calls wait until plan fragment
+  // execution has been initiated, otherwise we might try to cancel fragment
+  // execution at Impala daemons where it hasn't even started
+  // TODO: revisit this, it may not be true anymore
+  lock_guard<mutex> l(lock_);
+
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
   query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
@@ -129,9 +138,9 @@ Status Coordinator::Exec() {
     InitFilterRoutingTable();
   }
 
-  // At this point, all static setup is done and all structures are initialized. Only
-  // runtime-related state changes past this point (examples: fragment instance
-  // profiles, etc.)
+  // At this point, all static setup is done and all structures are initialized.
+  // Only runtime-related state changes past this point (examples:
+  // num_remaining_backends_, fragment instance profiles, etc.)
 
   StartBackendExec();
   RETURN_IF_ERROR(FinishBackendStartup());
@@ -146,7 +155,7 @@ Status Coordinator::Exec() {
       // which means we failed Prepare
       Status prepare_status = query_state_->WaitForPrepare();
       DCHECK(!prepare_status.ok());
-      return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
+      return prepare_status;
     }
 
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase
@@ -160,6 +169,7 @@ Status Coordinator::Exec() {
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
   }
+
   return Status::OK();
 }
 
@@ -198,8 +208,6 @@ void Coordinator::InitFragmentStats() {
 void Coordinator::InitBackendStates() {
   int num_backends = schedule_.per_backend_exec_params().size();
   DCHECK_GT(num_backends, 0);
-
-  lock_guard<SpinLock> l(backend_states_init_lock_);
   backend_states_.resize(num_backends);
 
   RuntimeProfile::Counter* num_backends_counter =
@@ -207,13 +215,19 @@ void Coordinator::InitBackendStates() {
   num_backends_counter->Set(num_backends);
 
   // create BackendStates
+  bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
+  const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address();
   int backend_idx = 0;
   for (const auto& entry: schedule_.per_backend_exec_params()) {
+    if (has_coord_fragment && coord_address == entry.first) {
+      coord_backend_idx_ = backend_idx;
+    }
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(query_id(), backend_idx, filter_mode_));
     backend_state->Init(entry.second, fragment_stats_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
+  DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -327,8 +341,8 @@ void Coordinator::InitFilterRoutingTable() {
 
 void Coordinator::StartBackendExec() {
   int num_backends = backend_states_.size();
-  exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends));
-  backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+  exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+  num_remaining_backends_ = num_backends;
 
   DebugOptions debug_options(schedule_.query_options());
 
@@ -340,11 +354,11 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
-              exec_rpcs_complete_barrier_.get());
+            exec_complete_barrier_.get());
         });
   }
-  exec_rpcs_complete_barrier_->Wait();
 
+  exec_complete_barrier_->Wait();
   VLOG_QUERY << "started execution on " << num_backends << " backends for query_id="
              << PrintId(query_id());
   query_events_->MarkEvent(
@@ -353,24 +367,26 @@ void Coordinator::StartBackendExec() {
 }
 
 Status Coordinator::FinishBackendStartup() {
+  Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
   // Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
   HistogramMetric latencies(def, 30 * 60 * 1000, 4);
-  Status status = Status::OK();
-  string error_hostname;
   for (BackendState* backend_state: backend_states_) {
     // preserve the first non-OK, if there is one
     Status backend_status = backend_state->GetStatus();
-    if (!backend_status.ok() && status.ok()) {
-      status = backend_status;
-      error_hostname = backend_state->impalad_address().hostname;
-    }
+    if (!backend_status.ok() && status.ok()) status = backend_status;
     latencies.Update(backend_state->rpc_latency());
   }
+
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
-  return UpdateExecState(status, nullptr, error_hostname);
+
+  if (!status.ok()) {
+    query_status_ = status;
+    CancelInternal();
+  }
+  return status;
 }
 
 string Coordinator::FilterDebugString() {
@@ -430,115 +446,40 @@ string Coordinator::FilterDebugString() {
   return Substitute("\n$0", table_printer.ToString());
 }
 
-const char* Coordinator::ExecStateToString(const ExecState state) {
-  static const unordered_map<ExecState, const char *> exec_state_to_str{
-    {ExecState::EXECUTING,        "EXECUTING"},
-    {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
-    {ExecState::CANCELLED,        "CANCELLED"},
-    {ExecState::ERROR,            "ERROR"}};
-  return exec_state_to_str.at(state);
+Status Coordinator::GetStatus() {
+  lock_guard<mutex> l(lock_);
+  return query_status_;
 }
 
-Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
-  DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED);
-  Status ret_status;
-  {
-    lock_guard<SpinLock> l(exec_state_lock_);
-    // May have already entered a terminal state, in which case nothing to do.
-    if (exec_state_ != ExecState::EXECUTING) return exec_status_;
-    DCHECK(exec_status_.ok()) << exec_status_;
-    exec_state_ = state;
-    if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
-    ret_status = exec_status_;
-  }
-  VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()),
-      state == ExecState::CANCELLED ? "cancelled" : "completed");
-  HandleExecStateTransition(ExecState::EXECUTING, state);
-  return ret_status;
-}
-
-Status Coordinator::UpdateExecState(const Status& status,
-    const TUniqueId* failed_finst, const string& instance_hostname) {
-  Status ret_status;
-  ExecState old_state, new_state;
+Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+    bool is_fragment_failure, const TUniqueId& instance_id) {
   {
-    lock_guard<SpinLock> l(exec_state_lock_);
-    old_state = exec_state_;
-    if (old_state == ExecState::EXECUTING) {
-      DCHECK(exec_status_.ok()) << exec_status_;
-      if (!status.ok()) {
-        // Error while executing - go to ERROR state.
-        exec_status_ = status;
-        exec_state_ = ExecState::ERROR;
-      }
-    } else if (old_state == ExecState::RETURNED_RESULTS) {
-      // Already returned all results. Leave exec status as ok, stay in this state.
-      DCHECK(exec_status_.ok()) << exec_status_;
-    } else if (old_state == ExecState::CANCELLED) {
-      // Client requested cancellation already, stay in this state.  Ignores errors
-      // after requested cancellations.
-      DCHECK(exec_status_.IsCancelled()) << exec_status_;
-    } else {
-      // Already in the ERROR state, stay in this state but update status to be the
-      // first non-cancelled status.
-      DCHECK_EQ(old_state, ExecState::ERROR);
-      DCHECK(!exec_status_.ok());
-      if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) {
-        exec_status_ = status;
-      }
-    }
-    new_state = exec_state_;
-    ret_status = exec_status_;
-  }
-  // Log interesting status: a non-cancelled error or a cancellation if was executing.
-  if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) {
-    VLOG_QUERY << Substitute(
-        "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
-        PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A",
-        instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state),
-        status.GetDetail());
-  }
-  // After dropping the lock, apply the state transition (if any) side-effects.
-  HandleExecStateTransition(old_state, new_state);
-  return ret_status;
-}
-
-bool Coordinator::ReturnedAllResults() {
-  lock_guard<SpinLock> l(exec_state_lock_);
-  return exec_state_ == ExecState::RETURNED_RESULTS;
-}
-
-void Coordinator::HandleExecStateTransition(
-    const ExecState old_state, const ExecState new_state) {
-  static const unordered_map<ExecState, const char *> exec_state_to_event{
-    {ExecState::EXECUTING,        "Executing"},
-    {ExecState::RETURNED_RESULTS, "Last row fetched"},
-    {ExecState::CANCELLED,        "Execution cancelled"},
-    {ExecState::ERROR,            "Execution error"}};
-  if (old_state == new_state) return;
-  // Once we enter a terminal state, we stay there, guaranteeing this code runs only once.
-  DCHECK_EQ(old_state, ExecState::EXECUTING);
-  // Should never transition to the initial state.
-  DCHECK_NE(new_state, ExecState::EXECUTING);
-
-  query_events_->MarkEvent(exec_state_to_event.at(new_state));
-  // TODO: IMPALA-7011 is this needed? This will also happen on the "backend" side of
-  // cancel rpc. And in the case of EOS, sink already knows this.
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
-  // This thread won the race to transitioning into a terminal state - terminate
-  // execution and release resources.
-  ReleaseExecResources();
-  if (new_state == ExecState::RETURNED_RESULTS) {
-    // TODO: IMPALA-6984: cancel all backends in this case too.
-    WaitForBackends();
+    lock_guard<mutex> l(lock_);
+
+    // The query is done and we are just waiting for backends to clean up.
+    // Ignore their cancelled updates.
+    if (returned_all_results_ && status.IsCancelled()) return query_status_;
+
+    // nothing to update
+    if (status.ok()) return query_status_;
+
+    // don't override an error status; also, cancellation has already started
+    if (!query_status_.ok()) return query_status_;
+
+    query_status_ = status;
+    CancelInternal();
+  }
+
+  if (is_fragment_failure) {
+    // Log the id of the fragment that first failed so we can track it down more easily.
+    VLOG_QUERY << "query_id=" << PrintId(query_id())
+               << " failed because fragment_instance_id=" << PrintId(instance_id)
+               << " on host=" << backend_hostname << " failed.";
   } else {
-    CancelBackends();
+    VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error on host="
+               << backend_hostname;
   }
-  ReleaseAdmissionControlResources();
-  // Can compute summary only after we stop accepting reports from the backends. Both
-  // WaitForBackends() and CancelBackends() ensures that.
-  // TODO: should move this off of the query execution path?
-  ComputeQuerySummary();
+  return query_status_;
 }
 
 Status Coordinator::FinalizeHdfsInsert() {
@@ -550,7 +491,7 @@ Status Coordinator::FinalizeHdfsInsert() {
 
   VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
   SCOPED_TIMER(finalization_timer_);
-  Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
+  Status return_status = GetStatus();
   if (return_status.ok()) {
     HdfsTableDescriptor* hdfs_table;
     RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
@@ -576,13 +517,22 @@ Status Coordinator::FinalizeHdfsInsert() {
   return return_status;
 }
 
-void Coordinator::WaitForBackends() {
-  int32_t num_remaining = backend_exec_complete_barrier_->pending();
-  if (num_remaining > 0) {
-    VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining
-               << " remaining. query_id=" << PrintId(query_id());
-    backend_exec_complete_barrier_->Wait();
+Status Coordinator::WaitForBackendCompletion() {
+  unique_lock<mutex> l(lock_);
+  while (num_remaining_backends_ > 0 && query_status_.ok()) {
+    VLOG_QUERY << "Coordinator waiting for backends to finish, "
+               << num_remaining_backends_ << " remaining. query_id="
+               << PrintId(query_id());
+    backend_completion_cv_.Wait(l);
   }
+  if (query_status_.ok()) {
+    VLOG_QUERY << "All backends finished successfully. query_id=" << PrintId(query_id());
+  } else {
+    VLOG_QUERY << "All backends finished due to one or more errors. query_id="
+               << PrintId(query_id()) << ". " << query_status_.GetDetail();
+  }
+
+  return query_status_;
 }
 
 Status Coordinator::Wait() {
@@ -593,22 +543,34 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateExecState(coord_instance_->WaitForOpen(),
-        &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname);
+    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
+        runtime_state()->fragment_instance_id());
   }
+
   DCHECK_EQ(stmt_type_, TStmtType::DML);
-  // DML finalization can only happen when all backends have completed all side-effects
-  // and reported relevant state.
-  WaitForBackends();
-  if (finalize_params() != nullptr) {
-    RETURN_IF_ERROR(UpdateExecState(
-            FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
-  }
-  // DML requests are finished at this point.
-  RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
+  // Query finalization can only happen when all backends have reported relevant
+  // state. They only have relevant state to report in the parallel INSERT case,
+  // otherwise all the relevant state is from the coordinator fragment which will be
+  // available after Open() returns.  Ignore the returned status if finalization is
+  // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+  // regardless.
+  Status status = WaitForBackendCompletion();
+  if (finalize_params() == nullptr && !status.ok()) return status;
+
+  // Execution of query fragments has finished. We don't need to hold onto query execution
+  // resources while we finalize the query.
+  ReleaseExecResources();
+  // Query finalization is required only for HDFS table sinks
+  if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
+  // Release admission control resources after we'd done the potentially heavyweight
+  // finalization.
+  ReleaseAdmissionControlResources();
+
   query_profile_->AddInfoString(
       "DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
-  return Status::OK();
+  // For DML queries, when Wait is done, the query is complete.
+  ComputeQuerySummary();
+  return status;
 }
 
 Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
@@ -616,54 +578,88 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   DCHECK(has_called_wait_);
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  // exec_state_lock_ not needed here though since this path won't execute concurrently
-  // with itself or DML finalization.
-  if (exec_state_ == ExecState::RETURNED_RESULTS) {
-    // Nothing left to do: already in a terminal state and no more results.
+  if (returned_all_results_) {
+    // May be called after the first time we set *eos. Re-set *eos and return here;
+    // already torn-down coord_sink_ so no more work to do.
     *eos = true;
     return Status::OK();
   }
-  DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
-  DCHECK(coord_sink_ != nullptr)     << "Exec() should be called first";
-  RuntimeState* runtime_state = coord_instance_->runtime_state();
 
-  Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
-  RETURN_IF_ERROR(UpdateExecState(
-          status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
-  if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
+  DCHECK(coord_sink_ != nullptr)
+      << "GetNext() called without result sink. Perhaps Prepare() failed and was not "
+      << "checked?";
+  Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
+
+  // if there was an error, we need to return the query's error status rather than
+  // the status we just got back from the local executor (which may well be CANCELLED
+  // in that case).  Coordinator fragment failed in this case so we log the query_id.
+  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
+      runtime_state()->fragment_instance_id()));
+
+  if (*eos) {
+    returned_all_results_ = true;
+    query_events_->MarkEvent("Last row fetched");
+    // release query execution resources here, since we won't be fetching more result rows
+    ReleaseExecResources();
+    // wait for all backends to complete before computing the summary
+    // TODO: relocate this so GetNext() won't have to wait for backends to complete?
+    RETURN_IF_ERROR(WaitForBackendCompletion());
+    // Release admission control resources after backends are finished.
+    ReleaseAdmissionControlResources();
+    // if the query completed successfully, compute the summary
+    if (query_status_.ok()) ComputeQuerySummary();
+  }
+
   return Status::OK();
 }
 
-void Coordinator::Cancel() {
-  // Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel
-  // RPC passing the exec RPC.
-  DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
-      exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called first";
-  discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
+void Coordinator::Cancel(const Status* cause) {
+  lock_guard<mutex> l(lock_);
+  // if the query status indicates an error, cancellation has already been initiated;
+  // prevent others from cancelling a second time
+  if (!query_status_.ok()) return;
+
+  // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers
+  // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
+  // of a successful query. Need to clean up relationship between query_status_ here and
+  // in QueryExecState. See IMPALA-4279.
+  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
+  CancelInternal();
 }
 
-void Coordinator::CancelBackends() {
+void Coordinator::CancelInternal() {
+  VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id());
+  // TODO: remove when restructuring cancellation, which should happen automatically
+  // as soon as the coordinator knows that the query is finished
+  DCHECK(!query_status_.ok());
+
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
     DCHECK(backend_state != nullptr);
     if (backend_state->Cancel()) ++num_cancelled;
   }
-  backend_exec_complete_barrier_->NotifyRemaining();
-
   VLOG_QUERY << Substitute(
       "CancelBackends() query_id=$0, tried to cancel $1 backends",
       PrintId(query_id()), num_cancelled);
+  backend_completion_cv_.NotifyAll();
+
+  ReleaseExecResourcesLocked();
+  ReleaseAdmissionControlResourcesLocked();
+  // Report the summary with whatever progress the query made before being cancelled.
+  ComputeQuerySummary();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
-  VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
-            << " backend_idx=" << params.coord_state_idx;
+  VLOG_FILE << "UpdateBackendExecStatus()  backend_idx=" << params.coord_state_idx;
   if (params.coord_state_idx >= backend_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown backend index $0 (max known: $1)",
             params.coord_state_idx, backend_states_.size() - 1));
   }
   BackendState* backend_state = backend_states_[params.coord_state_idx];
+  // TODO: return here if returned_all_results_?
+  // TODO: return CANCELLED in that case? Although that makes the cancellation propagation
+  // path more irregular.
 
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
@@ -672,30 +668,46 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
   }
 
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
-    // This backend execution has completed.
+    // This report made this backend done, so update the status and
+    // num_remaining_backends_.
+
+    // for now, abort the query if we see any error except if returned_all_results_ is
+    // true (UpdateStatus() initiates cancellation, if it hasn't already been)
+    // TODO: clarify control flow here, it's unclear we should even process this status
+    // report if returned_all_results_ is true
     bool is_fragment_failure;
     TUniqueId failed_instance_id;
     Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
-    int pending_backends = backend_exec_complete_barrier_->Notify();
-    if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
-      VLOG_QUERY << "Backend completed:"
-                 << " host=" << TNetworkAddressToString(backend_state->impalad_address())
-                 << " remaining=" << pending_backends
-                 << " query_id=" << PrintId(query_id());
+    if (!status.ok() && !returned_all_results_) {
+      Status ignored =
+          UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
+              is_fragment_failure, failed_instance_id);
+      return Status::OK();
+    }
+
+    lock_guard<mutex> l(lock_);
+    DCHECK_GT(num_remaining_backends_, 0);
+    if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
+      VLOG_QUERY << "Backend completed: "
+          << " host=" << TNetworkAddressToString(backend_state->impalad_address())
+          << " remaining=" << num_remaining_backends_ - 1
+          << " query_id=" << PrintId(query_id());
       BackendState::LogFirstInProgress(backend_states_);
     }
-    if (!status.ok()) {
-      // TODO: IMPALA-5119: call UpdateExecState() asynchronously rather than
-      // from within this RPC handler (since it can make RPCs).
-      discard_result(UpdateExecState(status,
-              is_fragment_failure ? &failed_instance_id : nullptr,
-              TNetworkAddressToString(backend_state->impalad_address())));
+    if (--num_remaining_backends_ == 0 || !status.ok()) {
+      backend_completion_cv_.NotifyAll();
     }
+    return Status::OK();
   }
   // If all results have been returned, return a cancelled status to force the fragment
   // instance to stop executing.
-  // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
-  return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
+  if (returned_all_results_) return Status::CANCELLED;
+
+  return Status::OK();
+}
+
+RuntimeState* Coordinator::runtime_state() {
+  return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
 }
 
 // TODO: add histogram/percentile
@@ -728,14 +740,20 @@ void Coordinator::ComputeQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  {
-    lock_guard<SpinLock> l(backend_states_init_lock_);
-    for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
+  for (BackendState* state: backend_states_) {
+    state->MergeErrorLog(&merged);
   }
   return PrintErrorMapToString(merged);
 }
 
 void Coordinator::ReleaseExecResources() {
+  lock_guard<mutex> l(lock_);
+  ReleaseExecResourcesLocked();
+}
+
+void Coordinator::ReleaseExecResourcesLocked() {
+  if (released_exec_resources_) return;
+  released_exec_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -749,6 +767,8 @@ void Coordinator::ReleaseExecResources() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
+  // Need to protect against failed Prepare(), where root_sink() would not be set.
+  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // Now that we've released our own resources, can release query-wide resources.
   if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
   // At this point some tracked memory may still be used in the coordinator for result
@@ -756,20 +776,27 @@ void Coordinator::ReleaseExecResources() {
 }
 
 void Coordinator::ReleaseAdmissionControlResources() {
-  LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
+  lock_guard<mutex> l(lock_);
+  ReleaseAdmissionControlResourcesLocked();
+}
+
+void Coordinator::ReleaseAdmissionControlResourcesLocked() {
+  if (released_admission_control_resources_) return;
+  LOG(INFO) << "Release admission control resources for query_id="
+            << PrintId(query_ctx().query_id);
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
+  released_admission_control_resources_ = true;
   query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilter() called although runtime filters are disabled";
-  DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
+  DCHECK(exec_complete_barrier_.get() != nullptr)
       << "Filters received before fragments started!";
-
-  exec_rpcs_complete_barrier_->Wait();
+  exec_complete_barrier_->Wait();
   DCHECK(filter_routing_table_complete_)
       << "Filter received before routing table complete";
 
@@ -840,7 +867,6 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   rpc_params.__set_dst_query_id(query_id());
   rpc_params.__set_filter_id(params.filter_id);
 
-  // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
   for (BackendState* bs: backend_states_) {
     for (int fragment_idx: target_fragment_idxs) {
       rpc_params.__set_dst_fragment_idx(fragment_idx);
@@ -914,19 +940,23 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
   }
 }
 
+const TUniqueId& Coordinator::query_id() const {
+  return query_ctx().query_id;
+}
+
 void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
   lock_guard<SpinLock> l(exec_summary_.lock);
   *exec_summary = exec_summary_.thrift_exec_summary;
 }
 
 MemTracker* Coordinator::query_mem_tracker() const {
-  return query_state_->query_mem_tracker();
+  return query_state()->query_mem_tracker();
 }
 
 void Coordinator::BackendsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<SpinLock> l(backend_states_init_lock_);
+    lock_guard<mutex> l(lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->ToJson(&val, doc);
@@ -939,7 +969,7 @@ void Coordinator::BackendsToJson(Document* doc) {
 void Coordinator::FInstanceStatsToJson(Document* doc) {
   Value states(kArrayType);
   {
-    lock_guard<SpinLock> l(backend_states_init_lock_);
+    lock_guard<mutex> l(lock_);
     for (BackendState* state : backend_states_) {
       Value val(kObjectType);
       state->InstanceStatsToJson(&val, doc);
@@ -949,14 +979,6 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
   doc->AddMember("backend_instances", states, doc->GetAllocator());
 }
 
-const TQueryCtx& Coordinator::query_ctx() const {
-  return schedule_.request().query_ctx;
-}
-
-const TUniqueId& Coordinator::query_id() const {
-  return query_ctx().query_id;
-}
-
 const TFinalizeParams* Coordinator::finalize_params() const {
   return schedule_.request().__isset.finalize_params
       ? &schedule_.request().finalize_params : nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 36c9f26..723047b 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -20,15 +20,26 @@
 
 #include <string>
 #include <vector>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/median.hpp>
+#include <boost/accumulators/statistics/min.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/variance.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
+#include <boost/unordered_set.hpp>
 #include <rapidjson/document.h>
 
 #include "common/global-types.h"
+#include "common/hdfs.h"
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
+#include "scheduling/query-schedule.h"
 #include "util/condition-variable.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile-counters.h"
@@ -45,7 +56,6 @@ class TPlanExecRequest;
 class TRuntimeProfileTree;
 class RuntimeProfile;
 class QueryResultSet;
-class QuerySchedule;
 class MemTracker;
 class PlanRootSink;
 class FragmentInstanceState;
@@ -55,9 +65,10 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
 /// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation. Once a query ends, either by returning EOS, through
-/// client cancellation, returning an error, or by finalizing a DML request, the
-/// coordinator releases resources.
+/// query, including cancellation. Once a query ends, either through cancellation or
+/// by returning eos, the coordinator releases resources. (Note that DML requests
+/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
+/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
 ///
 /// The coordinator monitors the execution status of fragment instances and aborts the
 /// entire query if an error is reported by any of them.
@@ -66,80 +77,80 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same machine as
 /// the coordinator.
 ///
-/// Thread-safe except where noted.
-///
+/// Thread-safe, with the exception of GetNext().
+//
 /// A typical sequence of calls for a single query (calls under the same numbered
 /// item can happen concurrently):
 /// 1. client: Exec()
 /// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
 /// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
 ///
-/// A query is considered to be executing until one of three things occurs:
-/// 1. An error is encountered. Backend cancellation is automatically initiated for all
-///    backends that haven't yet completed and the overall query status is set to the
-///    first (non-cancelled) encountered error status.
-/// 2. The query is cancelled via an explicit Cancel() call. The overall query status
-///    is set to CANCELLED and cancellation is initiated for all backends still
-///    executing (without an error status).
-/// 3. The query has returned all rows. The overall query status is OK (and remains
-///    OK). Client cancellation is no longer possible and subsequent backend errors are
-///    ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case).
-///
-/// Lifecycle: this object must not be destroyed until after one of the three states
-/// above is reached (error, cancelled, or EOS) to ensure resources are released.
-///
-/// Lock ordering: (lower-numbered acquired before higher-numbered)
-/// 1. wait_lock_
-/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
-///    ExecSummary::lock (leafs)
+/// The implementation ensures that setting an overall error status and initiating
+/// cancellation of all fragment instances is atomic.
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
+/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
+/// TODO: clarify cancellation path; in particular, cancel as soon as we return
+/// all results
 class Coordinator { // NOLINT: The member variables could be re-ordered to save space
  public:
   Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
-  /// Initiate asynchronous execution of a query with the given schedule. When it
-  /// returns, all fragment instances have started executing at their respective
-  /// backends. Exec() must be called exactly once and a call to Exec() must precede
-  /// all other member function calls.
+  /// Initiate asynchronous execution of a query with the given schedule. When it returns,
+  /// all fragment instances have started executing at their respective backends.
+  /// A call to Exec() must precede all other member function calls.
   Status Exec() WARN_UNUSED_RESULT;
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the
-  /// query doesn't return rows, until the query finishes or is cancelled. A call to
-  /// Wait() must precede all calls to GetNext().  Multiple calls to Wait() are
-  /// idempotent and it is okay to issue multiple Wait() calls concurrently.
+  /// query doesn't return rows, until the query finishes or is cancelled.
+  /// A call to Wait() must precede all calls to GetNext().
+  /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
+  /// Wait() calls concurrently.
   Status Wait() WARN_UNUSED_RESULT;
 
   /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows'
-  /// rows, but will not return more. If *eos is true, all rows have been returned.
-  /// Returns a non-OK status if an error was encountered either locally or by any of
-  /// the executing backends, or if the query was cancelled via Cancel().  After *eos
-  /// is true, subsequent calls to GetNext() will be a no-op.
+  /// rows, but will not return more.
+  ///
+  /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a
+  /// no-op.
+  ///
+  /// GetNext() will not set *eos=true until all fragment instances have either completed
+  /// or have failed.
+  ///
+  /// Returns an error status if an error was encountered either locally or by any of the
+  /// remote fragments or if the query was cancelled.
   ///
   /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext()
-  /// calls.
+  /// calls (but may call any of the other member functions concurrently with GetNext()).
   Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT;
 
-  /// Cancel execution of query and sets the overall query status to CANCELLED if the
-  /// query is still executing. Idempotent.
-  void Cancel();
+  /// Cancel execution of query. This includes the execution of the local plan fragment,
+  /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
+  /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
+  /// Idempotent.
+  void Cancel(const Status* cause = nullptr);
 
-  /// Called by the report status RPC handler to update execution status of a
-  /// particular backend as well as dml_exec_state_ and the profile.
+  /// Updates execution status of a particular backend as well as dml_exec_state_.
+  /// Also updates num_remaining_backends_ and cancels execution if the backend has an
+  /// error status.
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
+  /// Only valid to call after Exec().
+  QueryState* query_state() const { return query_state_; }
+
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
   RuntimeProfile* query_profile() const { return query_profile_; }
 
-  /// Safe to call only after Exec().
+  const TUniqueId& query_id() const;
+
   MemTracker* query_mem_tracker() const;
 
-  /// Safe to call only after Wait().
+  /// This is safe to call only after Wait()
   DmlExecState* dml_exec_state() { return &dml_exec_state_; }
 
   /// Return error log for coord and all the fragments. The error messages from the
@@ -148,6 +159,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   const ProgressUpdater& progress() { return progress_; }
 
+  /// Returns query_status_.
+  Status GetStatus();
+
   /// Get a copy of the current exec summary. Thread-safe.
   void GetTExecSummary(TExecSummary* exec_summary);
 
@@ -174,20 +188,18 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// owned by the ClientRequestState that owns this coordinator
   const QuerySchedule& schedule_;
 
-  /// Copied from TQueryExecRequest, governs when finalization occurs. Set in Exec().
+  /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
   TStmtType::type stmt_type_;
 
-  /// BackendStates for all execution backends, including the coordinator. All elements
-  /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
+  /// BackendStates for all execution backends, including the coordinator.
+  /// All elements are non-nullptr. Owned by obj_pool(). Populated by
+  /// InitBackendExec().
   std::vector<BackendState*> backend_states_;
 
-  /// Protects the population of backend_states_ vector (not the BackendState objects).
-  /// Used when accessing backend_states_ if it's not guaranteed that
-  /// InitBackendStates() has completed.
-  SpinLock backend_states_init_lock_;
+  // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment
+  int coord_backend_idx_ = -1;
 
-  /// The QueryState for this coordinator. Reference taken in Exec(). Reference
-  /// released in destructor.
+  /// The QueryState for this coordinator. Set in Exec(). Released in TearDown().
   QueryState* query_state_ = nullptr;
 
   /// Non-null if and only if the query produces results for the client; i.e. is of
@@ -198,10 +210,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Owned by the QueryState. Set in Exec().
+  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
+  /// reference of QueryState released) in TearDown().
   FragmentInstanceState* coord_instance_ = nullptr;
 
-  /// Owned by the QueryState. Set in Exec().
+  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
+  /// GetNext() hits eos.
   PlanRootSink* coord_sink_ = nullptr;
 
   /// ensures single-threaded execution of Wait(). See lock ordering class comment.
@@ -209,17 +223,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   bool has_called_wait_ = false;  // if true, Wait() was called; protected by wait_lock_
 
-  /// Keeps track of number of completed ranges and total scan ranges. Initialized by
-  /// Exec().
+  /// Keeps track of number of completed ranges and total scan ranges.
   ProgressUpdater progress_;
 
-  /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
-  RuntimeProfile* query_profile_ = nullptr;
-
-  /// Total time spent in finalization (typically 0 except for INSERT into hdfs
-  /// tables). Set in Exec().
-  RuntimeProfile::Counter* finalization_timer_ = nullptr;
-
   /// Total number of filter updates received (always 0 if filter mode is not
   /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
   RuntimeProfile::Counter* filter_updates_received_ = nullptr;
@@ -250,7 +256,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
     void Init(const QuerySchedule& query_schedule);
   };
 
-  // Initialized by Exec().
   ExecSummary exec_summary_;
 
   /// Filled in as the query completes and tracks the results of DML queries.  This is
@@ -258,40 +263,52 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// coordinator fragment: only one of the two can legitimately produce updates.
   DmlExecState dml_exec_state_;
 
+  /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
+  RuntimeProfile* query_profile_ = nullptr;
+
+  /// Protects all fields below. This is held while making RPCs, so this lock should
+  /// only be acquired if the acquiring thread is prepared to wait for a significant
+  /// time.
+  /// TODO: clarify to what extent the fields below need to be protected by lock_
+  /// Lock ordering is
+  /// 1. wait_lock_
+  /// 2. lock_
+  /// 3. BackendState::lock_
+  /// 4. filter_lock_
+  boost::mutex lock_;
+
+  /// Overall status of the entire query; set to the first reported fragment error
+  /// status or to CANCELLED, if Cancel() is called.
+  Status query_status_;
+
+  /// If true, the query is done returning all results.  It is possible that the
+  /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries
+  /// with limit)
+  /// Once this is set to true, errors from execution backends are ignored.
+  bool returned_all_results_ = false;
+
+  /// If there is no coordinator fragment, Wait() simply waits until all
+  /// backends report completion by notifying on backend_completion_cv_.
+  /// Tied to lock_.
+  ConditionVariable backend_completion_cv_;
+
+  /// Count of the number of backends for which done != true. When this
+  /// hits 0, any Wait()'ing thread is notified
+  int num_remaining_backends_ = 0;
+
   /// Event timeline for this query. Not owned.
   RuntimeProfile::EventSequence* query_events_ = nullptr;
 
-  /// Indexed by fragment idx (TPlanFragment.idx). Filled in
-  /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by BackendState
-  /// sequentially, without synchronization.
+  /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(),
+  /// elements live in obj_pool().
   std::vector<FragmentStats*> fragment_stats_;
 
-  /// Barrier that is released when all calls to BackendState::Exec() have
-  /// returned. Initialized in StartBackendExec().
-  boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_;
-
-  /// Barrier that is released when all backends have indicated execution completion,
-  /// or when all backends are cancelled due to an execution error or client requested
-  /// cancellation. Initialized in StartBackendExec().
-  boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
-
-  SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
-
-  /// EXECUTING: in-flight; the only non-terminal state
-  /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
-  /// CANCELLED: Cancel() was called (not: someone called CancelBackends())
-  /// ERROR: received an error from a backend
-  enum class ExecState {
-    EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
-  };
-  ExecState exec_state_ = ExecState::EXECUTING;
+  /// total time spent in finalization (typically 0 except for INSERT into hdfs tables)
+  RuntimeProfile::Counter* finalization_timer_ = nullptr;
 
-  /// Overall execution status; only set on exec_state_ transitions:
-  /// - EXECUTING: OK
-  /// - RETURNED_RESULTS: OK
-  /// - CANCELLED: CANCELLED
-  /// - ERROR: error status
-  Status exec_status_;
+  /// Barrier that is released when all calls to ExecRemoteFragment() have
+  /// returned, successfully or not. Initialised during Exec().
+  boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
 
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
@@ -304,6 +321,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
+  /// True if and only if ReleaseExecResources() has been called.
+  bool released_exec_resources_ = false;
+
+  /// True if and only if ReleaseAdmissionControlResources() has been called.
+  bool released_admission_control_resources_ = false;
+
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
@@ -311,67 +334,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// HDFS INSERT finalization is not required.
   const TFinalizeParams* finalize_params() const;
 
-  const TQueryCtx& query_ctx() const;
+  const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
 
-  const TUniqueId& query_id() const;
+  /// Only valid *after* calling Exec(). Return nullptr if the running query does not
+  /// produce any rows.
+  RuntimeState* runtime_state();
 
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
-  /// Called when the query is done executing due to reaching EOS or client
-  /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets
-  /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), and
-  /// finalizes execution (cancels remaining backends if transitioning to CANCELLED;
-  /// either way, calls ComputeQuerySummary() and releases resources). Returns the
-  /// resulting overall execution status.
-  Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT;
-
-  /// Transitions 'exec_state_' given an execution status and returns the resulting
-  /// overall status:
-  ///
-  /// - if the 'status' parameter is ok, no state transition
-  /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to ERROR
-  /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: does not
-  ///   transition state (those are terminal states) however in the case of ERROR,
-  ///   status may be updated to a more interesting status.
-  ///
-  /// Should not be called for (client initiated) cancellation. Call
-  /// SetNonErrorTerminalState(CANCELLED) instead.
-  ///
-  /// 'failed_finstance' is the fragment instance id that has failed (or nullptr if the
-  /// failure is not specific to a fragment instance), used for error reporting along
-  /// with 'instance_hostname'.
-  Status UpdateExecState(const Status& status, const TUniqueId* failed_finstance,
-      const string& instance_hostname) WARN_UNUSED_RESULT;
-
-  /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If the caller
-  /// transitioned to a terminal state (which happens exactly once for the lifetime of
-  /// the Coordinator object), then finalizes execution (cancels remaining backends if
-  /// transitioning to CANCELLED; in all cases releases resources and calls
-  /// ComputeQuerySummary().
-  void HandleExecStateTransition(const ExecState old_state, const ExecState new_state);
-
-  /// Return true if 'exec_state_' is RETURNED_RESULTS.
-  /// TODO: remove with IMPALA-6984.
-  bool ReturnedAllResults() WARN_UNUSED_RESULT;
-
-  /// Return the string representation of 'state'.
-  static const char* ExecStateToString(const ExecState state);
-
-  // For DCHECK_EQ, etc of ExecState values.
-  friend std::ostream& operator<<(std::ostream& o, const ExecState s) {
-    return o << ExecStateToString(s);
-  }
-
-  /// Helper for HandleExecStateTransition(). Sends cancellation request to all
-  /// executing backends but does not wait for acknowledgement from the backends. The
-  /// ExecState state-machine ensures this is called at most once.
-  void CancelBackends();
-
-  /// Returns only when either all execution backends have reported success or a request
-  /// to cancel the backends has already been sent. It is safe to call this concurrently,
-  /// but any calls must be made only after Exec().
-  void WaitForBackends();
+  /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when
+  /// the query is not being cancelled in the case where the query limit is reached.
+  void CancelInternal();
+
+  /// Acquires lock_ and updates query_status_ with 'status' if it's not already
+  /// an error status, and returns the current query_status_. The status may be
+  /// due to an error in a specific fragment instance, or it can be a general error
+  /// not tied to a specific fragment instance.
+  /// Calls CancelInternal() when switching to an error status.
+  /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
+  /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
+  /// reporting. For a general error not tied to a specific instance,
+  /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
+  /// 'backend_hostname' is used for error reporting in either case.
+  Status UpdateStatus(const Status& status, const std::string& backend_hostname,
+      bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
+
+  /// Returns only when either all execution backends have reported success or the query
+  /// is in error. Returns the status of the query.
+  /// It is safe to call this concurrently, but any calls must be made only after Exec().
+  Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
 
   /// Initializes fragment_stats_ and query_profile_. Must be called before
   /// InitBackendStates().
@@ -393,33 +385,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// finishing the INSERT in flight.
   Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
 
-  /// Helper for Exec(). Populates backend_states_, starts query execution at all
-  /// backends in parallel, and blocks until startup completes.
+  /// Populates backend_states_, starts query execution at all backends in parallel, and
+  /// blocks until startup completes.
   void StartBackendExec();
 
-  /// Helper for Exec(). Checks for errors encountered when starting backend execution,
-  /// using any non-OK status, if any, as the overall status. Returns the overall
-  /// status. Also updates query_profile_ with the startup latency histogram.
+  /// Calls CancelInternal() and returns an error if there was any error starting
+  /// backend execution.
+  /// Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
 
-  /// Helper for HandleExecStateTransition(). Releases all resources associated with
-  /// query execution. The ExecState state-machine ensures this is called exactly once.
+  /// Releases all resources associated with query execution. Acquires lock_. Idempotent.
   void ReleaseExecResources();
 
-  /// Helper for HandleExecStateTransition(). Releases admission control resources for
-  /// use by other queries. This should only be called if one of following
-  /// preconditions is satisfied for each backend on which the query is executing:
-  ///
-  /// * The backend finished execution.  Rationale: the backend isn't consuming
-  ///   resources.
+  /// Same as ReleaseExecResources() except the lock must be held by the caller.
+  void ReleaseExecResourcesLocked();
+
+  /// Releases admission control resources for use by other queries.
+  /// This should only be called if one of following preconditions is satisfied for each
+  /// backend on which the query is executing:
+  /// * The backend finished execution.
+  ///   Rationale: the backend isn't consuming resources.
+  //
   /// * A cancellation RPC was delivered to the backend.
   ///   Rationale: the backend will be cancelled and release resources soon. By the
   ///   time a newly admitted query fragment starts up on the backend and starts consuming
   ///   resources, the resources from this query will probably have been released.
+  //
   /// * Sending the cancellation RPC to the backend failed
   ///   Rationale: the backend is either down or will tear itself down when it next tries
   ///   to send a status RPC to the coordinator. It's possible that the fragment will be
@@ -430,13 +425,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   ///   pessimistically queueing queries while we wait for a response from a backend that
   ///   may never come.
   ///
-  /// Calling WaitForBackends() or CancelBackends() before this function is sufficient
-  /// to satisfy the above preconditions. If the query has an expensive finalization
-  /// step post query execution (e.g. a DML statement), then this should be called
-  /// after that completes to avoid over-admitting queries.
+  /// Calling WaitForBackendCompletion() or CancelInternal() before this function is
+  /// sufficient to satisfy the above preconditions. If the query has an expensive
+  /// finalization step post query execution (e.g. a DML statement), then this should
+  /// be called after that completes to avoid over-admitting queries.
   ///
-  /// The ExecState state-machine ensures this is called exactly once.
+  /// Acquires lock_. Idempotent.
   void ReleaseAdmissionControlResources();
+
+  /// Same as ReleaseAdmissionControlResources() except lock must be held by caller.
+  void ReleaseAdmissionControlResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 2ca1256..12b9b78 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
 
   // Cancel the parent query. 'lock_' should not be held because cancellation involves
   // RPCs and can block for a long time.
-  if (coord != NULL) coord->Cancel();
+  if (coord != NULL) coord->Cancel(cause);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 3af4c9b..fb3f261 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -111,6 +111,11 @@ class ClientRequestState;
 /// 6. ClientRequestState::expiration_data_lock_
 /// 7. Coordinator::exec_summary_lock
 ///
+/// Coordinator::lock_ should not be acquired at the same time as the
+/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
+/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of
+/// the above lock ordering.
+///
 /// The following locks are not held in conjunction with other locks:
 /// * query_log_lock_
 /// * session_timeout_lock_

http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 827c526..49b0bde 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -33,23 +33,8 @@ class CountingBarrier {
   }
 
   /// Sends one notification, decrementing the number of pending notifications by one.
-  /// Returns the remaining pending notifications.
-  int32_t Notify() {
-    int32_t result = count_.Add(-1);
-    if (result == 0) promise_.Set(true);
-    return result;
-  }
-
-  /// Sets the number of pending notifications to 0 and unblocks Wait().
-  void NotifyRemaining() {
-    while (true) {
-      int32_t value = count_.Load();
-      if (value <= 0) return;  // count_ can legitimately drop below 0
-      if (count_.CompareAndSwap(value, 0)) {
-        promise_.Set(true);
-        return;
-      }
-    }
+  void Notify() {
+    if (count_.Add(-1) == 0) promise_.Set(true);
   }
 
   /// Blocks until all notifications are received.
@@ -59,8 +44,6 @@ class CountingBarrier {
   /// case '*timed_out' will be true.
   void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); }
 
-  int32_t pending() const { return count_.Load(); }
-
  private:
   /// Used to signal waiters when all notifications are received.
   Promise<bool> promise_;