You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/05/16 15:21:51 UTC
[1/3] impala git commit: IMPALA-6645: Enable disk spill encryption by
default
Repository: impala
Updated Branches:
refs/heads/2.x 0ce9056a1 -> bee907c5f
IMPALA-6645: Enable disk spill encryption by default
Perf:
Targeted benchmarks with a heavily spilling query on a machine
with PCLMULQDQ support show < 5% of CPU time spent in encryption and
decryption. PCLMULQDQ was introduced in AMD Bulldozer (c. 2011)
and Intel Westmere (c. 2010).
Testing:
Ran core tests with the change.
Updated the custom cluster test to exercise the non-default
configuration.
Change-Id: Iee4be2a95d689f66c3663d99e4df0fb3968893a9
Reviewed-on: http://gerrit.cloudera.org:8080/10345
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e4fcc31c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e4fcc31c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e4fcc31c
Branch: refs/heads/2.x
Commit: e4fcc31cf1385fa99c06a04c0c4edecfbf5dbd95
Parents: 0ce9056
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 11 11:00:35 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 16 00:53:41 2018 +0000
----------------------------------------------------------------------
be/src/runtime/tmp-file-mgr.cc | 2 +-
.../queries/QueryTest/basic-spilling.test | 16 +++++++++
.../QueryTest/disk-spill-encryption.test | 15 ---------
.../test_disk_spill_configurations.py | 34 ++++++++++++++++++++
.../test_disk_spill_encryption.py | 32 ------------------
5 files changed, 51 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 04e15d4..b995518 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -38,7 +38,7 @@
#include "common/names.h"
-DEFINE_bool(disk_spill_encryption, false,
+DEFINE_bool(disk_spill_encryption, true,
"Set this to encrypt and perform an integrity "
"check on all data spilled to disk during a query");
DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");
http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test b/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test
new file mode 100644
index 0000000..513ba2c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/basic-spilling.test
@@ -0,0 +1,16 @@
+====
+---- QUERY
+# A basic spilling query to exercise spill-to-disk end-to-end.
+set buffer_pool_limit=90m;
+set default_spillable_buffer_size=64k;
+select count(*)
+from (select distinct o_orderdate, o_custkey, o_comment
+ from tpch_parquet.orders) v;
+---- RESULTS
+1500000
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+# Verify that spilling was activated.
+row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
+====
http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test b/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test
deleted file mode 100644
index 48649e5..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/disk-spill-encryption.test
+++ /dev/null
@@ -1,15 +0,0 @@
-====
----- QUERY
-set buffer_pool_limit=90m;
-set default_spillable_buffer_size=64k;
-select count(*)
-from (select distinct o_orderdate, o_custkey, o_comment
- from tpch_parquet.orders) v;
----- RESULTS
-1500000
----- TYPES
-BIGINT
----- RUNTIME_PROFILE
-# Verify that spilling was activated.
-row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
-====
http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/tests/custom_cluster/test_disk_spill_configurations.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_disk_spill_configurations.py b/tests/custom_cluster/test_disk_spill_configurations.py
new file mode 100644
index 0000000..efddd23
--- /dev/null
+++ b/tests/custom_cluster/test_disk_spill_configurations.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestDiskSpillConfigurations(CustomClusterTestSuite):
+ """Tests to exercise non-default disk spill configurations end-to-end."""
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args("--disk_spill_encryption=false")
+ def test_disk_spill_encryption_disabled(self, vector):
+ """Disk spill encryption is enabled by default. We only need a custom cluster to test
+ the non-default configuration."""
+ self.run_test_case('QueryTest/basic-spilling', vector)
http://git-wip-us.apache.org/repos/asf/impala/blob/e4fcc31c/tests/custom_cluster/test_disk_spill_encryption.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_disk_spill_encryption.py b/tests/custom_cluster/test_disk_spill_encryption.py
deleted file mode 100644
index c9a5aeb..0000000
--- a/tests/custom_cluster/test_disk_spill_encryption.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pytest
-
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-
-class TestDiskSpillEncryption(CustomClusterTestSuite):
- """ Tests to exercise disk spill encryption end-to-end. """
-
- @classmethod
- def get_workload(self):
- return 'functional-query'
-
- @pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args("--disk_spill_encryption=true")
- def test_spilling_query(self, vector):
- self.run_test_case('QueryTest/disk-spill-encryption', vector)
[3/3] impala git commit: IMPALA-7022: TestQueries.test_subquery:
Subquery must not return more than one row
Posted by mi...@apache.org.
IMPALA-7022: TestQueries.test_subquery: Subquery must not return more than one row
TestQueries.test_subquery sometimes fails during exhaustive
tests.
In the tests we expect to catch an exception that is
prefixed by the "Query aborted:" string. The prefix is
usually added by impala_beeswax.py::wait_for_completion(),
but in rare cases it isn't added.
>From the point of the test it is irrelevant if the exception
is prefixed with "Query aborted:" or not, so I removed it
from the expected exception string.
Change-Id: I3b8655ad273b1dd7a601099f617db609e4a4797b
Reviewed-on: http://gerrit.cloudera.org:8080/10407
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bee907c5
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bee907c5
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bee907c5
Branch: refs/heads/2.x
Commit: bee907c5fce6a7f3499b45bd13aa7815ccbb27d8
Parents: 0e6117b
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Tue May 15 15:47:29 2018 +0200
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 16 10:10:18 2018 +0000
----------------------------------------------------------------------
.../functional-query/queries/QueryTest/subquery.test | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/bee907c5/testdata/workloads/functional-query/queries/QueryTest/subquery.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/subquery.test b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
index 2d691ed..df1be9a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/subquery.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/subquery.test
@@ -929,7 +929,7 @@ WHERE int_col =
ORDER BY id
---- RESULTS
---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
====
---- QUERY
# Uncorrelated subquery in arithmetic expr that returns multiple rows
@@ -940,7 +940,7 @@ WHERE int_col =
ORDER BY id
---- RESULTS
---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
====
---- QUERY
# Uncorrelated subquery in binary predicate that returns scalar value at runtime
@@ -1000,7 +1000,7 @@ SELECT a FROM (values(1 a),(2),(3)) v
WHERE a = (SELECT x FROM (values(1 x),(2),(3)) v)
---- RESULTS
---- CATCH
-Query aborted:Subquery must not return more than one row:
+Subquery must not return more than one row:
====
---- QUERY
# Subquery that returns more than one row
@@ -1009,7 +1009,7 @@ SELECT id FROM functional.alltypes
WHERE id = (SELECT bigint_col FROM functional.alltypes_view)
---- RESULTS
---- CATCH
-Query aborted:Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
+Subquery must not return more than one row: SELECT bigint_col FROM functional.alltypes_view
====
---- QUERY
# Runtime scalar subquery with offset.
[2/3] impala git commit: IMPALA-7033/IMPALA-7030: Backout suspected
change leading to crash
Posted by mi...@apache.org.
IMPALA-7033/IMPALA-7030: Backout suspected change leading to crash
Revert "IMPALA-5384, part 2: Simplify Coordinator locking and clarify state"
This reverts commit 6ca87e46736a1e591ed7d7d5fee05b4b4d2fbb50.
Change-Id: Idc63006e6e04130b2873a6a9730e434c563327c5
Reviewed-on: http://gerrit.cloudera.org:8080/10412
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/10423
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0e6117be
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0e6117be
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0e6117be
Branch: refs/heads/2.x
Commit: 0e6117be3be8475ea1aa727dec55979cbffd05b7
Parents: e4fcc31c
Author: Dan Hecht <dh...@cloudera.com>
Authored: Tue May 15 11:36:15 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Wed May 16 09:25:31 2018 +0000
----------------------------------------------------------------------
be/src/runtime/coordinator-backend-state.h | 8 -
be/src/runtime/coordinator.cc | 424 +++++++++++++-----------
be/src/runtime/coordinator.h | 330 +++++++++---------
be/src/service/client-request-state.cc | 2 +-
be/src/service/impala-server.h | 5 +
be/src/util/counting-barrier.h | 21 +-
6 files changed, 395 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index e7af2e2..d2f122c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -21,17 +21,9 @@
#include <vector>
#include <unordered_set>
-#include <boost/accumulators/accumulators.hpp>
-#include <boost/accumulators/statistics/max.hpp>
-#include <boost/accumulators/statistics/mean.hpp>
-#include <boost/accumulators/statistics/median.hpp>
-#include <boost/accumulators/statistics/min.hpp>
-#include <boost/accumulators/statistics/stats.hpp>
-#include <boost/accumulators/statistics/variance.hpp>
#include <boost/thread/mutex.hpp>
#include "runtime/coordinator.h"
-#include "scheduling/query-schedule.h"
#include "util/progress-updater.h"
#include "util/stopwatch.h"
#include "util/runtime-profile.h"
http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7e12f94..a423de8 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -25,7 +25,6 @@
#include <boost/algorithm/string.hpp>
#include <gutil/strings/substitute.h>
-#include "common/hdfs.h"
#include "exec/data-sink.h"
#include "exec/plan-root-sink.h"
#include "gen-cpp/ImpalaInternalService.h"
@@ -40,7 +39,6 @@
#include "runtime/query-state.h"
#include "scheduling/admission-controller.h"
#include "scheduling/scheduler.h"
-#include "scheduling/query-schedule.h"
#include "util/bloom-filter.h"
#include "util/counting-barrier.h"
#include "util/hdfs-bulk-ops.h"
@@ -53,13 +51,16 @@
using namespace apache::thrift;
using namespace rapidjson;
+using namespace strings;
using boost::algorithm::iequals;
using boost::algorithm::is_any_of;
using boost::algorithm::join;
using boost::algorithm::token_compress_on;
using boost::algorithm::split;
using boost::filesystem::path;
+using std::unique_ptr;
+DECLARE_int32(be_port);
DECLARE_string(hostname);
using namespace impala;
@@ -75,9 +76,11 @@ Coordinator::Coordinator(
query_events_(events) {}
Coordinator::~Coordinator() {
- // Must have entered a terminal exec state guaranteeing resources were released.
- DCHECK_NE(exec_state_, ExecState::EXECUTING);
- // Release the coordinator's reference to the query control structures.
+ DCHECK(released_exec_resources_)
+ << "ReleaseExecResources() must be called before Coordinator is destroyed";
+ DCHECK(released_admission_control_resources_)
+ << "ReleaseAdmissionControlResources() must be called before Coordinator is "
+ << "destroyed";
if (query_state_ != nullptr) {
ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
}
@@ -106,6 +109,12 @@ Status Coordinator::Exec() {
bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
+ // to keep things simple, make async Cancel() calls wait until plan fragment
+ // execution has been initiated, otherwise we might try to cancel fragment
+ // execution at Impala daemons where it hasn't even started
+ // TODO: revisit this, it may not be true anymore
+ lock_guard<mutex> l(lock_);
+
query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
@@ -129,9 +138,9 @@ Status Coordinator::Exec() {
InitFilterRoutingTable();
}
- // At this point, all static setup is done and all structures are initialized. Only
- // runtime-related state changes past this point (examples: fragment instance
- // profiles, etc.)
+ // At this point, all static setup is done and all structures are initialized.
+ // Only runtime-related state changes past this point (examples:
+ // num_remaining_backends_, fragment instance profiles, etc.)
StartBackendExec();
RETURN_IF_ERROR(FinishBackendStartup());
@@ -146,7 +155,7 @@ Status Coordinator::Exec() {
// which means we failed Prepare
Status prepare_status = query_state_->WaitForPrepare();
DCHECK(!prepare_status.ok());
- return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
+ return prepare_status;
}
// When GetFInstanceState() returns the coordinator instance, the Prepare phase
@@ -160,6 +169,7 @@ Status Coordinator::Exec() {
coord_sink_ = coord_instance_->root_sink();
DCHECK(coord_sink_ != nullptr);
}
+
return Status::OK();
}
@@ -198,8 +208,6 @@ void Coordinator::InitFragmentStats() {
void Coordinator::InitBackendStates() {
int num_backends = schedule_.per_backend_exec_params().size();
DCHECK_GT(num_backends, 0);
-
- lock_guard<SpinLock> l(backend_states_init_lock_);
backend_states_.resize(num_backends);
RuntimeProfile::Counter* num_backends_counter =
@@ -207,13 +215,19 @@ void Coordinator::InitBackendStates() {
num_backends_counter->Set(num_backends);
// create BackendStates
+ bool has_coord_fragment = schedule_.GetCoordFragment() != nullptr;
+ const TNetworkAddress& coord_address = ExecEnv::GetInstance()->backend_address();
int backend_idx = 0;
for (const auto& entry: schedule_.per_backend_exec_params()) {
+ if (has_coord_fragment && coord_address == entry.first) {
+ coord_backend_idx_ = backend_idx;
+ }
BackendState* backend_state = obj_pool()->Add(
new BackendState(query_id(), backend_idx, filter_mode_));
backend_state->Init(entry.second, fragment_stats_, obj_pool());
backend_states_[backend_idx++] = backend_state;
}
+ DCHECK(!has_coord_fragment || coord_backend_idx_ != -1);
}
void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -327,8 +341,8 @@ void Coordinator::InitFilterRoutingTable() {
void Coordinator::StartBackendExec() {
int num_backends = backend_states_.size();
- exec_rpcs_complete_barrier_.reset(new CountingBarrier(num_backends));
- backend_exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+ exec_complete_barrier_.reset(new CountingBarrier(num_backends));
+ num_remaining_backends_ = num_backends;
DebugOptions debug_options(schedule_.query_options());
@@ -340,11 +354,11 @@ void Coordinator::StartBackendExec() {
ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
[backend_state, this, &debug_options]() {
backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
- exec_rpcs_complete_barrier_.get());
+ exec_complete_barrier_.get());
});
}
- exec_rpcs_complete_barrier_->Wait();
+ exec_complete_barrier_->Wait();
VLOG_QUERY << "started execution on " << num_backends << " backends for query_id="
<< PrintId(query_id());
query_events_->MarkEvent(
@@ -353,24 +367,26 @@ void Coordinator::StartBackendExec() {
}
Status Coordinator::FinishBackendStartup() {
+ Status status = Status::OK();
const TMetricDef& def =
MakeTMetricDef("backend-startup-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
// Capture up to 30 minutes of start-up times, in ms, with 4 s.f. accuracy.
HistogramMetric latencies(def, 30 * 60 * 1000, 4);
- Status status = Status::OK();
- string error_hostname;
for (BackendState* backend_state: backend_states_) {
// preserve the first non-OK, if there is one
Status backend_status = backend_state->GetStatus();
- if (!backend_status.ok() && status.ok()) {
- status = backend_status;
- error_hostname = backend_state->impalad_address().hostname;
- }
+ if (!backend_status.ok() && status.ok()) status = backend_status;
latencies.Update(backend_state->rpc_latency());
}
+
query_profile_->AddInfoString(
"Backend startup latencies", latencies.ToHumanReadable());
- return UpdateExecState(status, nullptr, error_hostname);
+
+ if (!status.ok()) {
+ query_status_ = status;
+ CancelInternal();
+ }
+ return status;
}
string Coordinator::FilterDebugString() {
@@ -430,115 +446,40 @@ string Coordinator::FilterDebugString() {
return Substitute("\n$0", table_printer.ToString());
}
-const char* Coordinator::ExecStateToString(const ExecState state) {
- static const unordered_map<ExecState, const char *> exec_state_to_str{
- {ExecState::EXECUTING, "EXECUTING"},
- {ExecState::RETURNED_RESULTS, "RETURNED_RESULTS"},
- {ExecState::CANCELLED, "CANCELLED"},
- {ExecState::ERROR, "ERROR"}};
- return exec_state_to_str.at(state);
+Status Coordinator::GetStatus() {
+ lock_guard<mutex> l(lock_);
+ return query_status_;
}
-Status Coordinator::SetNonErrorTerminalState(const ExecState state) {
- DCHECK(state == ExecState::RETURNED_RESULTS || state == ExecState::CANCELLED);
- Status ret_status;
- {
- lock_guard<SpinLock> l(exec_state_lock_);
- // May have already entered a terminal state, in which case nothing to do.
- if (exec_state_ != ExecState::EXECUTING) return exec_status_;
- DCHECK(exec_status_.ok()) << exec_status_;
- exec_state_ = state;
- if (state == ExecState::CANCELLED) exec_status_ = Status::CANCELLED;
- ret_status = exec_status_;
- }
- VLOG_QUERY << Substitute("ExecState: query id=$0 execution $1", PrintId(query_id()),
- state == ExecState::CANCELLED ? "cancelled" : "completed");
- HandleExecStateTransition(ExecState::EXECUTING, state);
- return ret_status;
-}
-
-Status Coordinator::UpdateExecState(const Status& status,
- const TUniqueId* failed_finst, const string& instance_hostname) {
- Status ret_status;
- ExecState old_state, new_state;
+Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+ bool is_fragment_failure, const TUniqueId& instance_id) {
{
- lock_guard<SpinLock> l(exec_state_lock_);
- old_state = exec_state_;
- if (old_state == ExecState::EXECUTING) {
- DCHECK(exec_status_.ok()) << exec_status_;
- if (!status.ok()) {
- // Error while executing - go to ERROR state.
- exec_status_ = status;
- exec_state_ = ExecState::ERROR;
- }
- } else if (old_state == ExecState::RETURNED_RESULTS) {
- // Already returned all results. Leave exec status as ok, stay in this state.
- DCHECK(exec_status_.ok()) << exec_status_;
- } else if (old_state == ExecState::CANCELLED) {
- // Client requested cancellation already, stay in this state. Ignores errors
- // after requested cancellations.
- DCHECK(exec_status_.IsCancelled()) << exec_status_;
- } else {
- // Already in the ERROR state, stay in this state but update status to be the
- // first non-cancelled status.
- DCHECK_EQ(old_state, ExecState::ERROR);
- DCHECK(!exec_status_.ok());
- if (!status.ok() && !status.IsCancelled() && exec_status_.IsCancelled()) {
- exec_status_ = status;
- }
- }
- new_state = exec_state_;
- ret_status = exec_status_;
- }
- // Log interesting status: a non-cancelled error or a cancellation if was executing.
- if (!status.ok() && (!status.IsCancelled() || old_state == ExecState::EXECUTING)) {
- VLOG_QUERY << Substitute(
- "ExecState: query id=$0 finstance=$1 on host=$2 ($3 -> $4) status=$5",
- PrintId(query_id()), failed_finst != nullptr ? PrintId(*failed_finst) : "N/A",
- instance_hostname, ExecStateToString(old_state), ExecStateToString(new_state),
- status.GetDetail());
- }
- // After dropping the lock, apply the state transition (if any) side-effects.
- HandleExecStateTransition(old_state, new_state);
- return ret_status;
-}
-
-bool Coordinator::ReturnedAllResults() {
- lock_guard<SpinLock> l(exec_state_lock_);
- return exec_state_ == ExecState::RETURNED_RESULTS;
-}
-
-void Coordinator::HandleExecStateTransition(
- const ExecState old_state, const ExecState new_state) {
- static const unordered_map<ExecState, const char *> exec_state_to_event{
- {ExecState::EXECUTING, "Executing"},
- {ExecState::RETURNED_RESULTS, "Last row fetched"},
- {ExecState::CANCELLED, "Execution cancelled"},
- {ExecState::ERROR, "Execution error"}};
- if (old_state == new_state) return;
- // Once we enter a terminal state, we stay there, guaranteeing this code runs only once.
- DCHECK_EQ(old_state, ExecState::EXECUTING);
- // Should never transition to the initial state.
- DCHECK_NE(new_state, ExecState::EXECUTING);
-
- query_events_->MarkEvent(exec_state_to_event.at(new_state));
- // TODO: IMPALA-7011 is this needed? This will also happen on the "backend" side of
- // cancel rpc. And in the case of EOS, sink already knows this.
- if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
- // This thread won the race to transitioning into a terminal state - terminate
- // execution and release resources.
- ReleaseExecResources();
- if (new_state == ExecState::RETURNED_RESULTS) {
- // TODO: IMPALA-6984: cancel all backends in this case too.
- WaitForBackends();
+ lock_guard<mutex> l(lock_);
+
+ // The query is done and we are just waiting for backends to clean up.
+ // Ignore their cancelled updates.
+ if (returned_all_results_ && status.IsCancelled()) return query_status_;
+
+ // nothing to update
+ if (status.ok()) return query_status_;
+
+ // don't override an error status; also, cancellation has already started
+ if (!query_status_.ok()) return query_status_;
+
+ query_status_ = status;
+ CancelInternal();
+ }
+
+ if (is_fragment_failure) {
+ // Log the id of the fragment that first failed so we can track it down more easily.
+ VLOG_QUERY << "query_id=" << PrintId(query_id())
+ << " failed because fragment_instance_id=" << PrintId(instance_id)
+ << " on host=" << backend_hostname << " failed.";
} else {
- CancelBackends();
+ VLOG_QUERY << "query_id=" << PrintId(query_id()) << " failed due to error on host="
+ << backend_hostname;
}
- ReleaseAdmissionControlResources();
- // Can compute summary only after we stop accepting reports from the backends. Both
- // WaitForBackends() and CancelBackends() ensures that.
- // TODO: should move this off of the query execution path?
- ComputeQuerySummary();
+ return query_status_;
}
Status Coordinator::FinalizeHdfsInsert() {
@@ -550,7 +491,7 @@ Status Coordinator::FinalizeHdfsInsert() {
VLOG_QUERY << "Finalizing query: " << PrintId(query_id());
SCOPED_TIMER(finalization_timer_);
- Status return_status = UpdateExecState(Status::OK(), nullptr, FLAGS_hostname);
+ Status return_status = GetStatus();
if (return_status.ok()) {
HdfsTableDescriptor* hdfs_table;
RETURN_IF_ERROR(DescriptorTbl::CreateHdfsTblDescriptor(query_ctx().desc_tbl,
@@ -576,13 +517,22 @@ Status Coordinator::FinalizeHdfsInsert() {
return return_status;
}
-void Coordinator::WaitForBackends() {
- int32_t num_remaining = backend_exec_complete_barrier_->pending();
- if (num_remaining > 0) {
- VLOG_QUERY << "Coordinator waiting for backends to finish, " << num_remaining
- << " remaining. query_id=" << PrintId(query_id());
- backend_exec_complete_barrier_->Wait();
+Status Coordinator::WaitForBackendCompletion() {
+ unique_lock<mutex> l(lock_);
+ while (num_remaining_backends_ > 0 && query_status_.ok()) {
+ VLOG_QUERY << "Coordinator waiting for backends to finish, "
+ << num_remaining_backends_ << " remaining. query_id="
+ << PrintId(query_id());
+ backend_completion_cv_.Wait(l);
}
+ if (query_status_.ok()) {
+ VLOG_QUERY << "All backends finished successfully. query_id=" << PrintId(query_id());
+ } else {
+ VLOG_QUERY << "All backends finished due to one or more errors. query_id="
+ << PrintId(query_id()) << ". " << query_status_.GetDetail();
+ }
+
+ return query_status_;
}
Status Coordinator::Wait() {
@@ -593,22 +543,34 @@ Status Coordinator::Wait() {
if (stmt_type_ == TStmtType::QUERY) {
DCHECK(coord_instance_ != nullptr);
- return UpdateExecState(coord_instance_->WaitForOpen(),
- &coord_instance_->runtime_state()->fragment_instance_id(), FLAGS_hostname);
+ return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
+ runtime_state()->fragment_instance_id());
}
+
DCHECK_EQ(stmt_type_, TStmtType::DML);
- // DML finalization can only happen when all backends have completed all side-effects
- // and reported relevant state.
- WaitForBackends();
- if (finalize_params() != nullptr) {
- RETURN_IF_ERROR(UpdateExecState(
- FinalizeHdfsInsert(), nullptr, FLAGS_hostname));
- }
- // DML requests are finished at this point.
- RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
+ // Query finalization can only happen when all backends have reported relevant
+ // state. They only have relevant state to report in the parallel INSERT case,
+ // otherwise all the relevant state is from the coordinator fragment which will be
+ // available after Open() returns. Ignore the returned status if finalization is
+ // required., since FinalizeHdfsInsert() will pick it up and needs to execute
+ // regardless.
+ Status status = WaitForBackendCompletion();
+ if (finalize_params() == nullptr && !status.ok()) return status;
+
+ // Execution of query fragments has finished. We don't need to hold onto query execution
+ // resources while we finalize the query.
+ ReleaseExecResources();
+ // Query finalization is required only for HDFS table sinks
+ if (finalize_params() != nullptr) RETURN_IF_ERROR(FinalizeHdfsInsert());
+ // Release admission control resources after we'd done the potentially heavyweight
+ // finalization.
+ ReleaseAdmissionControlResources();
+
query_profile_->AddInfoString(
"DML Stats", dml_exec_state_.OutputPartitionStats("\n"));
- return Status::OK();
+ // For DML queries, when Wait is done, the query is complete.
+ ComputeQuerySummary();
+ return status;
}
Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
@@ -616,54 +578,88 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
DCHECK(has_called_wait_);
SCOPED_TIMER(query_profile_->total_time_counter());
- // exec_state_lock_ not needed here though since this path won't execute concurrently
- // with itself or DML finalization.
- if (exec_state_ == ExecState::RETURNED_RESULTS) {
- // Nothing left to do: already in a terminal state and no more results.
+ if (returned_all_results_) {
+ // May be called after the first time we set *eos. Re-set *eos and return here;
+ // already torn-down coord_sink_ so no more work to do.
*eos = true;
return Status::OK();
}
- DCHECK(coord_instance_ != nullptr) << "Exec() should be called first";
- DCHECK(coord_sink_ != nullptr) << "Exec() should be called first";
- RuntimeState* runtime_state = coord_instance_->runtime_state();
- Status status = coord_sink_->GetNext(runtime_state, results, max_rows, eos);
- RETURN_IF_ERROR(UpdateExecState(
- status, &runtime_state->fragment_instance_id(), FLAGS_hostname));
- if (*eos) RETURN_IF_ERROR(SetNonErrorTerminalState(ExecState::RETURNED_RESULTS));
+ DCHECK(coord_sink_ != nullptr)
+ << "GetNext() called without result sink. Perhaps Prepare() failed and was not "
+ << "checked?";
+ Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
+
+ // if there was an error, we need to return the query's error status rather than
+ // the status we just got back from the local executor (which may well be CANCELLED
+ // in that case). Coordinator fragment failed in this case so we log the query_id.
+ RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
+ runtime_state()->fragment_instance_id()));
+
+ if (*eos) {
+ returned_all_results_ = true;
+ query_events_->MarkEvent("Last row fetched");
+ // release query execution resources here, since we won't be fetching more result rows
+ ReleaseExecResources();
+ // wait for all backends to complete before computing the summary
+ // TODO: relocate this so GetNext() won't have to wait for backends to complete?
+ RETURN_IF_ERROR(WaitForBackendCompletion());
+ // Release admission control resources after backends are finished.
+ ReleaseAdmissionControlResources();
+ // if the query completed successfully, compute the summary
+ if (query_status_.ok()) ComputeQuerySummary();
+ }
+
return Status::OK();
}
-void Coordinator::Cancel() {
- // Illegal to call Cancel() before Exec() returns, so there's no danger of the cancel
- // RPC passing the exec RPC.
- DCHECK(exec_rpcs_complete_barrier_ != nullptr &&
- exec_rpcs_complete_barrier_->pending() <= 0) << "Exec() must be called first";
- discard_result(SetNonErrorTerminalState(ExecState::CANCELLED));
+void Coordinator::Cancel(const Status* cause) {
+ lock_guard<mutex> l(lock_);
+ // if the query status indicates an error, cancellation has already been initiated;
+ // prevent others from cancelling a second time
+ if (!query_status_.ok()) return;
+
+ // TODO: This should default to OK(), not CANCELLED if there is no cause (or callers
+ // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
+ // of a successful query. Need to clean up relationship between query_status_ here and
+ // in QueryExecState. See IMPALA-4279.
+ query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
+ CancelInternal();
}
-void Coordinator::CancelBackends() {
+void Coordinator::CancelInternal() {
+ VLOG_QUERY << "Cancel() query_id=" << PrintId(query_id());
+ // TODO: remove when restructuring cancellation, which should happen automatically
+ // as soon as the coordinator knows that the query is finished
+ DCHECK(!query_status_.ok());
+
int num_cancelled = 0;
for (BackendState* backend_state: backend_states_) {
DCHECK(backend_state != nullptr);
if (backend_state->Cancel()) ++num_cancelled;
}
- backend_exec_complete_barrier_->NotifyRemaining();
-
VLOG_QUERY << Substitute(
"CancelBackends() query_id=$0, tried to cancel $1 backends",
PrintId(query_id()), num_cancelled);
+ backend_completion_cv_.NotifyAll();
+
+ ReleaseExecResourcesLocked();
+ ReleaseAdmissionControlResourcesLocked();
+ // Report the summary with whatever progress the query made before being cancelled.
+ ComputeQuerySummary();
}
Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
- VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
- << " backend_idx=" << params.coord_state_idx;
+ VLOG_FILE << "UpdateBackendExecStatus() backend_idx=" << params.coord_state_idx;
if (params.coord_state_idx >= backend_states_.size()) {
return Status(TErrorCode::INTERNAL_ERROR,
Substitute("Unknown backend index $0 (max known: $1)",
params.coord_state_idx, backend_states_.size() - 1));
}
BackendState* backend_state = backend_states_[params.coord_state_idx];
+ // TODO: return here if returned_all_results_?
+ // TODO: return CANCELLED in that case? Although that makes the cancellation propagation
+ // path more irregular.
// TODO: only do this when the sink is done; probably missing a done field
// in TReportExecStatus for that
@@ -672,30 +668,46 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
}
if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
- // This backend execution has completed.
+ // This report made this backend done, so update the status and
+ // num_remaining_backends_.
+
+ // for now, abort the query if we see any error except if returned_all_results_ is
+ // true (UpdateStatus() initiates cancellation, if it hasn't already been)
+ // TODO: clarify control flow here, it's unclear we should even process this status
+ // report if returned_all_results_ is true
bool is_fragment_failure;
TUniqueId failed_instance_id;
Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
- int pending_backends = backend_exec_complete_barrier_->Notify();
- if (VLOG_QUERY_IS_ON && pending_backends >= 0) {
- VLOG_QUERY << "Backend completed:"
- << " host=" << TNetworkAddressToString(backend_state->impalad_address())
- << " remaining=" << pending_backends
- << " query_id=" << PrintId(query_id());
+ if (!status.ok() && !returned_all_results_) {
+ Status ignored =
+ UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
+ is_fragment_failure, failed_instance_id);
+ return Status::OK();
+ }
+
+ lock_guard<mutex> l(lock_);
+ DCHECK_GT(num_remaining_backends_, 0);
+ if (VLOG_QUERY_IS_ON && num_remaining_backends_ > 1) {
+ VLOG_QUERY << "Backend completed: "
+ << " host=" << TNetworkAddressToString(backend_state->impalad_address())
+ << " remaining=" << num_remaining_backends_ - 1
+ << " query_id=" << PrintId(query_id());
BackendState::LogFirstInProgress(backend_states_);
}
- if (!status.ok()) {
- // TODO: IMPALA-5119: call UpdateExecState() asynchronously rather than
- // from within this RPC handler (since it can make RPCs).
- discard_result(UpdateExecState(status,
- is_fragment_failure ? &failed_instance_id : nullptr,
- TNetworkAddressToString(backend_state->impalad_address())));
+ if (--num_remaining_backends_ == 0 || !status.ok()) {
+ backend_completion_cv_.NotifyAll();
}
+ return Status::OK();
}
// If all results have been returned, return a cancelled status to force the fragment
// instance to stop executing.
- // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
- return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
+ if (returned_all_results_) return Status::CANCELLED;
+
+ return Status::OK();
+}
+
+RuntimeState* Coordinator::runtime_state() {
+ return coord_instance_ == nullptr ? nullptr : coord_instance_->runtime_state();
}
// TODO: add histogram/percentile
@@ -728,14 +740,20 @@ void Coordinator::ComputeQuerySummary() {
string Coordinator::GetErrorLog() {
ErrorLogMap merged;
- {
- lock_guard<SpinLock> l(backend_states_init_lock_);
- for (BackendState* state: backend_states_) state->MergeErrorLog(&merged);
+ for (BackendState* state: backend_states_) {
+ state->MergeErrorLog(&merged);
}
return PrintErrorMapToString(merged);
}
void Coordinator::ReleaseExecResources() {
+ lock_guard<mutex> l(lock_);
+ ReleaseExecResourcesLocked();
+}
+
+void Coordinator::ReleaseExecResourcesLocked() {
+ if (released_exec_resources_) return;
+ released_exec_resources_ = true;
if (filter_routing_table_.size() > 0) {
query_profile_->AddInfoString("Final filter table", FilterDebugString());
}
@@ -749,6 +767,8 @@ void Coordinator::ReleaseExecResources() {
}
// This may be NULL while executing UDFs.
if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
+ // Need to protect against failed Prepare(), where root_sink() would not be set.
+ if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
// Now that we've released our own resources, can release query-wide resources.
if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
// At this point some tracked memory may still be used in the coordinator for result
@@ -756,20 +776,27 @@ void Coordinator::ReleaseExecResources() {
}
void Coordinator::ReleaseAdmissionControlResources() {
- LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
+ lock_guard<mutex> l(lock_);
+ ReleaseAdmissionControlResourcesLocked();
+}
+
+void Coordinator::ReleaseAdmissionControlResourcesLocked() {
+ if (released_admission_control_resources_) return;
+ LOG(INFO) << "Release admission control resources for query_id="
+ << PrintId(query_ctx().query_id);
AdmissionController* admission_controller =
ExecEnv::GetInstance()->admission_controller();
if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
+ released_admission_control_resources_ = true;
query_events_->MarkEvent("Released admission control resources");
}
void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
<< "UpdateFilter() called although runtime filters are disabled";
- DCHECK(exec_rpcs_complete_barrier_.get() != nullptr)
+ DCHECK(exec_complete_barrier_.get() != nullptr)
<< "Filters received before fragments started!";
-
- exec_rpcs_complete_barrier_->Wait();
+ exec_complete_barrier_->Wait();
DCHECK(filter_routing_table_complete_)
<< "Filter received before routing table complete";
@@ -840,7 +867,6 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
rpc_params.__set_dst_query_id(query_id());
rpc_params.__set_filter_id(params.filter_id);
- // Waited for exec_rpcs_complete_barrier_ so backend_states_ is valid.
for (BackendState* bs: backend_states_) {
for (int fragment_idx: target_fragment_idxs) {
rpc_params.__set_dst_fragment_idx(fragment_idx);
@@ -914,19 +940,23 @@ void Coordinator::FilterState::Disable(MemTracker* tracker) {
}
}
+const TUniqueId& Coordinator::query_id() const {
+ return query_ctx().query_id;
+}
+
void Coordinator::GetTExecSummary(TExecSummary* exec_summary) {
lock_guard<SpinLock> l(exec_summary_.lock);
*exec_summary = exec_summary_.thrift_exec_summary;
}
MemTracker* Coordinator::query_mem_tracker() const {
- return query_state_->query_mem_tracker();
+ return query_state()->query_mem_tracker();
}
void Coordinator::BackendsToJson(Document* doc) {
Value states(kArrayType);
{
- lock_guard<SpinLock> l(backend_states_init_lock_);
+ lock_guard<mutex> l(lock_);
for (BackendState* state : backend_states_) {
Value val(kObjectType);
state->ToJson(&val, doc);
@@ -939,7 +969,7 @@ void Coordinator::BackendsToJson(Document* doc) {
void Coordinator::FInstanceStatsToJson(Document* doc) {
Value states(kArrayType);
{
- lock_guard<SpinLock> l(backend_states_init_lock_);
+ lock_guard<mutex> l(lock_);
for (BackendState* state : backend_states_) {
Value val(kObjectType);
state->InstanceStatsToJson(&val, doc);
@@ -949,14 +979,6 @@ void Coordinator::FInstanceStatsToJson(Document* doc) {
doc->AddMember("backend_instances", states, doc->GetAllocator());
}
-const TQueryCtx& Coordinator::query_ctx() const {
- return schedule_.request().query_ctx;
-}
-
-const TUniqueId& Coordinator::query_id() const {
- return query_ctx().query_id;
-}
-
const TFinalizeParams* Coordinator::finalize_params() const {
return schedule_.request().__isset.finalize_params
? &schedule_.request().finalize_params : nullptr;
http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 36c9f26..723047b 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -20,15 +20,26 @@
#include <string>
#include <vector>
+#include <boost/accumulators/accumulators.hpp>
+#include <boost/accumulators/statistics/max.hpp>
+#include <boost/accumulators/statistics/mean.hpp>
+#include <boost/accumulators/statistics/median.hpp>
+#include <boost/accumulators/statistics/min.hpp>
+#include <boost/accumulators/statistics/stats.hpp>
+#include <boost/accumulators/statistics/variance.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_map.hpp>
+#include <boost/unordered_set.hpp>
#include <rapidjson/document.h>
#include "common/global-types.h"
+#include "common/hdfs.h"
#include "common/status.h"
#include "gen-cpp/Frontend_types.h"
#include "gen-cpp/Types_types.h"
#include "runtime/dml-exec-state.h"
+#include "runtime/query-state.h"
+#include "scheduling/query-schedule.h"
#include "util/condition-variable.h"
#include "util/progress-updater.h"
#include "util/runtime-profile-counters.h"
@@ -45,7 +56,6 @@ class TPlanExecRequest;
class TRuntimeProfileTree;
class RuntimeProfile;
class QueryResultSet;
-class QuerySchedule;
class MemTracker;
class PlanRootSink;
class FragmentInstanceState;
@@ -55,9 +65,10 @@ class QueryState;
/// Query coordinator: handles execution of fragment instances on remote nodes, given a
/// TQueryExecRequest. As part of that, it handles all interactions with the executing
/// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation. Once a query ends, either by returning EOS, through
-/// client cancellation, returning an error, or by finalizing a DML request, the
-/// coordinator releases resources.
+/// query, including cancellation. Once a query ends, either through cancellation or
+/// by returning eos, the coordinator releases resources. (Note that DML requests
+/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
+/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
///
/// The coordinator monitors the execution status of fragment instances and aborts the
/// entire query if an error is reported by any of them.
@@ -66,80 +77,80 @@ class QueryState;
/// rows are produced by a fragment instance that always executes on the same machine as
/// the coordinator.
///
-/// Thread-safe except where noted.
-///
+/// Thread-safe, with the exception of GetNext().
+//
/// A typical sequence of calls for a single query (calls under the same numbered
/// item can happen concurrently):
/// 1. client: Exec()
/// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus()
/// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus()
///
-/// A query is considered to be executing until one of three things occurs:
-/// 1. An error is encountered. Backend cancellation is automatically initiated for all
-/// backends that haven't yet completed and the overall query status is set to the
-/// first (non-cancelled) encountered error status.
-/// 2. The query is cancelled via an explicit Cancel() call. The overall query status
-/// is set to CANCELLED and cancellation is initiated for all backends still
-/// executing (without an error status).
-/// 3. The query has returned all rows. The overall query status is OK (and remains
-/// OK). Client cancellation is no longer possible and subsequent backend errors are
-/// ignored. (TODO: IMPALA-6984 initiate backend cancellation in this case).
-///
-/// Lifecycle: this object must not be destroyed until after one of the three states
-/// above is reached (error, cancelled, or EOS) to ensure resources are released.
-///
-/// Lock ordering: (lower-numbered acquired before higher-numbered)
-/// 1. wait_lock_
-/// 2. exec_state_lock_, backend_states_init_lock_, filter_lock_,
-/// ExecSummary::lock (leafs)
+/// The implementation ensures that setting an overall error status and initiating
+/// cancellation of all fragment instances is atomic.
///
/// TODO: move into separate subdirectory and move nested classes into separate files
/// and unnest them
+/// TODO: clean up locking behavior; in particular, clarify dependency on lock_
+/// TODO: clarify cancellation path; in particular, cancel as soon as we return
+/// all results
class Coordinator { // NOLINT: The member variables could be re-ordered to save space
public:
Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events);
~Coordinator();
- /// Initiate asynchronous execution of a query with the given schedule. When it
- /// returns, all fragment instances have started executing at their respective
- /// backends. Exec() must be called exactly once and a call to Exec() must precede
- /// all other member function calls.
+ /// Initiate asynchronous execution of a query with the given schedule. When it returns,
+ /// all fragment instances have started executing at their respective backends.
+ /// A call to Exec() must precede all other member function calls.
Status Exec() WARN_UNUSED_RESULT;
/// Blocks until result rows are ready to be retrieved via GetNext(), or, if the
- /// query doesn't return rows, until the query finishes or is cancelled. A call to
- /// Wait() must precede all calls to GetNext(). Multiple calls to Wait() are
- /// idempotent and it is okay to issue multiple Wait() calls concurrently.
+ /// query doesn't return rows, until the query finishes or is cancelled.
+ /// A call to Wait() must precede all calls to GetNext().
+ /// Multiple calls to Wait() are idempotent and it is okay to issue multiple
+ /// Wait() calls concurrently.
Status Wait() WARN_UNUSED_RESULT;
/// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows'
- /// rows, but will not return more. If *eos is true, all rows have been returned.
- /// Returns a non-OK status if an error was encountered either locally or by any of
- /// the executing backends, or if the query was cancelled via Cancel(). After *eos
- /// is true, subsequent calls to GetNext() will be a no-op.
+ /// rows, but will not return more.
+ ///
+ /// If *eos is true, execution has completed. Subsequent calls to GetNext() will be a
+ /// no-op.
+ ///
+ /// GetNext() will not set *eos=true until all fragment instances have either completed
+ /// or have failed.
+ ///
+ /// Returns an error status if an error was encountered either locally or by any of the
+ /// remote fragments or if the query was cancelled.
///
/// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext()
- /// calls.
+ /// calls (but may call any of the other member functions concurrently with GetNext()).
Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT;
- /// Cancel execution of query and sets the overall query status to CANCELLED if the
- /// query is still executing. Idempotent.
- void Cancel();
+ /// Cancel execution of query. This includes the execution of the local plan fragment,
+ /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
+ /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
+ /// Idempotent.
+ void Cancel(const Status* cause = nullptr);
- /// Called by the report status RPC handler to update execution status of a
- /// particular backend as well as dml_exec_state_ and the profile.
+ /// Updates execution status of a particular backend as well as dml_exec_state_.
+ /// Also updates num_remaining_backends_ and cancels execution if the backend has an
+ /// error status.
Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
WARN_UNUSED_RESULT;
+ /// Only valid to call after Exec().
+ QueryState* query_state() const { return query_state_; }
+
/// Get cumulative profile aggregated over all fragments of the query.
/// This is a snapshot of the current state of execution and will change in
/// the future if not all fragments have finished execution.
RuntimeProfile* query_profile() const { return query_profile_; }
- /// Safe to call only after Exec().
+ const TUniqueId& query_id() const;
+
MemTracker* query_mem_tracker() const;
- /// Safe to call only after Wait().
+ /// This is safe to call only after Wait()
DmlExecState* dml_exec_state() { return &dml_exec_state_; }
/// Return error log for coord and all the fragments. The error messages from the
@@ -148,6 +159,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
const ProgressUpdater& progress() { return progress_; }
+ /// Returns query_status_.
+ Status GetStatus();
+
/// Get a copy of the current exec summary. Thread-safe.
void GetTExecSummary(TExecSummary* exec_summary);
@@ -174,20 +188,18 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// owned by the ClientRequestState that owns this coordinator
const QuerySchedule& schedule_;
- /// Copied from TQueryExecRequest, governs when finalization occurs. Set in Exec().
+ /// copied from TQueryExecRequest, governs when to call ReportQuerySummary
TStmtType::type stmt_type_;
- /// BackendStates for all execution backends, including the coordinator. All elements
- /// are non-nullptr and owned by obj_pool(). Populated by Exec()/InitBackendStates().
+ /// BackendStates for all execution backends, including the coordinator.
+ /// All elements are non-nullptr. Owned by obj_pool(). Populated by
+ /// InitBackendExec().
std::vector<BackendState*> backend_states_;
- /// Protects the population of backend_states_ vector (not the BackendState objects).
- /// Used when accessing backend_states_ if it's not guaranteed that
- /// InitBackendStates() has completed.
- SpinLock backend_states_init_lock_;
+ // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment
+ int coord_backend_idx_ = -1;
- /// The QueryState for this coordinator. Reference taken in Exec(). Reference
- /// released in destructor.
+ /// The QueryState for this coordinator. Set in Exec(). Released in TearDown().
QueryState* query_state_ = nullptr;
/// Non-null if and only if the query produces results for the client; i.e. is of
@@ -198,10 +210,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// Result rows are materialized by this fragment instance in its own thread. They are
/// materialized into a QueryResultSet provided to the coordinator during GetNext().
///
- /// Owned by the QueryState. Set in Exec().
+ /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
+ /// reference of QueryState released) in TearDown().
FragmentInstanceState* coord_instance_ = nullptr;
- /// Owned by the QueryState. Set in Exec().
+ /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
+ /// GetNext() hits eos.
PlanRootSink* coord_sink_ = nullptr;
/// ensures single-threaded execution of Wait(). See lock ordering class comment.
@@ -209,17 +223,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
bool has_called_wait_ = false; // if true, Wait() was called; protected by wait_lock_
- /// Keeps track of number of completed ranges and total scan ranges. Initialized by
- /// Exec().
+ /// Keeps track of number of completed ranges and total scan ranges.
ProgressUpdater progress_;
- /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
- RuntimeProfile* query_profile_ = nullptr;
-
- /// Total time spent in finalization (typically 0 except for INSERT into hdfs
- /// tables). Set in Exec().
- RuntimeProfile::Counter* finalization_timer_ = nullptr;
-
/// Total number of filter updates received (always 0 if filter mode is not
/// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
RuntimeProfile::Counter* filter_updates_received_ = nullptr;
@@ -250,7 +256,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
void Init(const QuerySchedule& query_schedule);
};
- // Initialized by Exec().
ExecSummary exec_summary_;
/// Filled in as the query completes and tracks the results of DML queries. This is
@@ -258,40 +263,52 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// coordinator fragment: only one of the two can legitimately produce updates.
DmlExecState dml_exec_state_;
+ /// Aggregate counters for the entire query. Lives in 'obj_pool_'.
+ RuntimeProfile* query_profile_ = nullptr;
+
+ /// Protects all fields below. This is held while making RPCs, so this lock should
+ /// only be acquired if the acquiring thread is prepared to wait for a significant
+ /// time.
+ /// TODO: clarify to what extent the fields below need to be protected by lock_
+ /// Lock ordering is
+ /// 1. wait_lock_
+ /// 2. lock_
+ /// 3. BackendState::lock_
+ /// 4. filter_lock_
+ boost::mutex lock_;
+
+ /// Overall status of the entire query; set to the first reported fragment error
+ /// status or to CANCELLED, if Cancel() is called.
+ Status query_status_;
+
+ /// If true, the query is done returning all results. It is possible that the
+ /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries
+ /// with limit)
+ /// Once this is set to true, errors from execution backends are ignored.
+ bool returned_all_results_ = false;
+
+ /// If there is no coordinator fragment, Wait() simply waits until all
+ /// backends report completion by notifying on backend_completion_cv_.
+ /// Tied to lock_.
+ ConditionVariable backend_completion_cv_;
+
+ /// Count of the number of backends for which done != true. When this
+ /// hits 0, any Wait()'ing thread is notified
+ int num_remaining_backends_ = 0;
+
/// Event timeline for this query. Not owned.
RuntimeProfile::EventSequence* query_events_ = nullptr;
- /// Indexed by fragment idx (TPlanFragment.idx). Filled in
- /// Exec()/InitFragmentStats(), elements live in obj_pool(). Updated by BackendState
- /// sequentially, without synchronization.
+ /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(),
+ /// elements live in obj_pool().
std::vector<FragmentStats*> fragment_stats_;
- /// Barrier that is released when all calls to BackendState::Exec() have
- /// returned. Initialized in StartBackendExec().
- boost::scoped_ptr<CountingBarrier> exec_rpcs_complete_barrier_;
-
- /// Barrier that is released when all backends have indicated execution completion,
- /// or when all backends are cancelled due to an execution error or client requested
- /// cancellation. Initialized in StartBackendExec().
- boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
-
- SpinLock exec_state_lock_; // protects exec-state_ and exec_status_
-
- /// EXECUTING: in-flight; the only non-terminal state
- /// RETURNED_RESULTS: GetNext() set eos to true, or for DML, the request is complete
- /// CANCELLED: Cancel() was called (not: someone called CancelBackends())
- /// ERROR: received an error from a backend
- enum class ExecState {
- EXECUTING, RETURNED_RESULTS, CANCELLED, ERROR
- };
- ExecState exec_state_ = ExecState::EXECUTING;
+ /// total time spent in finalization (typically 0 except for INSERT into hdfs tables)
+ RuntimeProfile::Counter* finalization_timer_ = nullptr;
- /// Overall execution status; only set on exec_state_ transitions:
- /// - EXECUTING: OK
- /// - RETURNED_RESULTS: OK
- /// - CANCELLED: CANCELLED
- /// - ERROR: error status
- Status exec_status_;
+ /// Barrier that is released when all calls to ExecRemoteFragment() have
+ /// returned, successfully or not. Initialised during Exec().
+ boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
/// Protects filter_routing_table_.
SpinLock filter_lock_;
@@ -304,6 +321,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// safe to concurrently read from filter_routing_table_.
bool filter_routing_table_complete_ = false;
+ /// True if and only if ReleaseExecResources() has been called.
+ bool released_exec_resources_ = false;
+
+ /// True if and only if ReleaseAdmissionControlResources() has been called.
+ bool released_admission_control_resources_ = false;
+
/// Returns a local object pool.
ObjectPool* obj_pool() { return obj_pool_.get(); }
@@ -311,67 +334,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// HDFS INSERT finalization is not required.
const TFinalizeParams* finalize_params() const;
- const TQueryCtx& query_ctx() const;
+ const TQueryCtx& query_ctx() const { return schedule_.request().query_ctx; }
- const TUniqueId& query_id() const;
+ /// Only valid *after* calling Exec(). Return nullptr if the running query does not
+ /// produce any rows.
+ RuntimeState* runtime_state();
/// Returns a pretty-printed table of the current filter state.
std::string FilterDebugString();
- /// Called when the query is done executing due to reaching EOS or client
- /// cancellation. If 'exec_state_' != EXECUTING, does nothing. Otherwise sets
- /// 'exec_state_' to 'state' (must be either CANCELLED or RETURNED_RESULTS), and
- /// finalizes execution (cancels remaining backends if transitioning to CANCELLED;
- /// either way, calls ComputeQuerySummary() and releases resources). Returns the
- /// resulting overall execution status.
- Status SetNonErrorTerminalState(const ExecState state) WARN_UNUSED_RESULT;
-
- /// Transitions 'exec_state_' given an execution status and returns the resulting
- /// overall status:
- ///
- /// - if the 'status' parameter is ok, no state transition
- /// - if 'exec_state_' is EXECUTING and 'status' is not ok, transitions to ERROR
- /// - if 'exec_state_' is already RETURNED_RESULTS, CANCELLED, or ERROR: does not
- /// transition state (those are terminal states) however in the case of ERROR,
- /// status may be updated to a more interesting status.
- ///
- /// Should not be called for (client initiated) cancellation. Call
- /// SetNonErrorTerminalState(CANCELLED) instead.
- ///
- /// 'failed_finstance' is the fragment instance id that has failed (or nullptr if the
- /// failure is not specific to a fragment instance), used for error reporting along
- /// with 'instance_hostname'.
- Status UpdateExecState(const Status& status, const TUniqueId* failed_finstance,
- const string& instance_hostname) WARN_UNUSED_RESULT;
-
- /// Helper for SetNonErrorTerminalState() and UpdateExecStateIfError(). If the caller
- /// transitioned to a terminal state (which happens exactly once for the lifetime of
- /// the Coordinator object), then finalizes execution (cancels remaining backends if
- /// transitioning to CANCELLED; in all cases releases resources and calls
- /// ComputeQuerySummary().
- void HandleExecStateTransition(const ExecState old_state, const ExecState new_state);
-
- /// Return true if 'exec_state_' is RETURNED_RESULTS.
- /// TODO: remove with IMPALA-6984.
- bool ReturnedAllResults() WARN_UNUSED_RESULT;
-
- /// Return the string representation of 'state'.
- static const char* ExecStateToString(const ExecState state);
-
- // For DCHECK_EQ, etc of ExecState values.
- friend std::ostream& operator<<(std::ostream& o, const ExecState s) {
- return o << ExecStateToString(s);
- }
-
- /// Helper for HandleExecStateTransition(). Sends cancellation request to all
- /// executing backends but does not wait for acknowledgement from the backends. The
- /// ExecState state-machine ensures this is called at most once.
- void CancelBackends();
-
- /// Returns only when either all execution backends have reported success or a request
- /// to cancel the backends has already been sent. It is safe to call this concurrently,
- /// but any calls must be made only after Exec().
- void WaitForBackends();
+ /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when
+ /// the query is not being cancelled in the case where the query limit is reached.
+ void CancelInternal();
+
+ /// Acquires lock_ and updates query_status_ with 'status' if it's not already
+ /// an error status, and returns the current query_status_. The status may be
+ /// due to an error in a specific fragment instance, or it can be a general error
+ /// not tied to a specific fragment instance.
+ /// Calls CancelInternal() when switching to an error status.
+ /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
+ /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
+ /// reporting. For a general error not tied to a specific instance,
+ /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
+ /// 'backend_hostname' is used for error reporting in either case.
+ Status UpdateStatus(const Status& status, const std::string& backend_hostname,
+ bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
+
+ /// Returns only when either all execution backends have reported success or the query
+ /// is in error. Returns the status of the query.
+ /// It is safe to call this concurrently, but any calls must be made only after Exec().
+ Status WaitForBackendCompletion() WARN_UNUSED_RESULT;
/// Initializes fragment_stats_ and query_profile_. Must be called before
/// InitBackendStates().
@@ -393,33 +385,36 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// finishing the INSERT in flight.
Status FinalizeHdfsInsert() WARN_UNUSED_RESULT;
- /// Helper for Exec(). Populates backend_states_, starts query execution at all
- /// backends in parallel, and blocks until startup completes.
+ /// Populates backend_states_, starts query execution at all backends in parallel, and
+ /// blocks until startup completes.
void StartBackendExec();
- /// Helper for Exec(). Checks for errors encountered when starting backend execution,
- /// using any non-OK status, if any, as the overall status. Returns the overall
- /// status. Also updates query_profile_ with the startup latency histogram.
+ /// Calls CancelInternal() and returns an error if there was any error starting
+ /// backend execution.
+ /// Also updates query_profile_ with the startup latency histogram.
Status FinishBackendStartup() WARN_UNUSED_RESULT;
/// Build the filter routing table by iterating over all plan nodes and collecting the
/// filters that they either produce or consume.
void InitFilterRoutingTable();
- /// Helper for HandleExecStateTransition(). Releases all resources associated with
- /// query execution. The ExecState state-machine ensures this is called exactly once.
+ /// Releases all resources associated with query execution. Acquires lock_. Idempotent.
void ReleaseExecResources();
- /// Helper for HandleExecStateTransition(). Releases admission control resources for
- /// use by other queries. This should only be called if one of following
- /// preconditions is satisfied for each backend on which the query is executing:
- ///
- /// * The backend finished execution. Rationale: the backend isn't consuming
- /// resources.
+ /// Same as ReleaseExecResources() except the lock must be held by the caller.
+ void ReleaseExecResourcesLocked();
+
+ /// Releases admission control resources for use by other queries.
+ /// This should only be called if one of following preconditions is satisfied for each
+ /// backend on which the query is executing:
+ /// * The backend finished execution.
+ /// Rationale: the backend isn't consuming resources.
+ //
/// * A cancellation RPC was delivered to the backend.
/// Rationale: the backend will be cancelled and release resources soon. By the
/// time a newly admitted query fragment starts up on the backend and starts consuming
/// resources, the resources from this query will probably have been released.
+ //
/// * Sending the cancellation RPC to the backend failed
/// Rationale: the backend is either down or will tear itself down when it next tries
/// to send a status RPC to the coordinator. It's possible that the fragment will be
@@ -430,13 +425,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// pessimistically queueing queries while we wait for a response from a backend that
/// may never come.
///
- /// Calling WaitForBackends() or CancelBackends() before this function is sufficient
- /// to satisfy the above preconditions. If the query has an expensive finalization
- /// step post query execution (e.g. a DML statement), then this should be called
- /// after that completes to avoid over-admitting queries.
+ /// Calling WaitForBackendCompletion() or CancelInternal() before this function is
+ /// sufficient to satisfy the above preconditions. If the query has an expensive
+ /// finalization step post query execution (e.g. a DML statement), then this should
+ /// be called after that completes to avoid over-admitting queries.
///
- /// The ExecState state-machine ensures this is called exactly once.
+ /// Acquires lock_. Idempotent.
void ReleaseAdmissionControlResources();
+
+ /// Same as ReleaseAdmissionControlResources() except lock must be held by caller.
+ void ReleaseAdmissionControlResourcesLocked();
};
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 2ca1256..12b9b78 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -904,7 +904,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
// Cancel the parent query. 'lock_' should not be held because cancellation involves
// RPCs and can block for a long time.
- if (coord != NULL) coord->Cancel();
+ if (coord != NULL) coord->Cancel(cause);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 3af4c9b..fb3f261 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -111,6 +111,11 @@ class ClientRequestState;
/// 6. ClientRequestState::expiration_data_lock_
/// 7. Coordinator::exec_summary_lock
///
+/// Coordinator::lock_ should not be acquired at the same time as the
+/// ImpalaServer/SessionState/ClientRequestState locks. Aside from
+/// Coordinator::exec_summary_lock_ the Coordinator's lock ordering is independent of
+/// the above lock ordering.
+///
/// The following locks are not held in conjunction with other locks:
/// * query_log_lock_
/// * session_timeout_lock_
http://git-wip-us.apache.org/repos/asf/impala/blob/0e6117be/be/src/util/counting-barrier.h
----------------------------------------------------------------------
diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h
index 827c526..49b0bde 100644
--- a/be/src/util/counting-barrier.h
+++ b/be/src/util/counting-barrier.h
@@ -33,23 +33,8 @@ class CountingBarrier {
}
/// Sends one notification, decrementing the number of pending notifications by one.
- /// Returns the remaining pending notifications.
- int32_t Notify() {
- int32_t result = count_.Add(-1);
- if (result == 0) promise_.Set(true);
- return result;
- }
-
- /// Sets the number of pending notifications to 0 and unblocks Wait().
- void NotifyRemaining() {
- while (true) {
- int32_t value = count_.Load();
- if (value <= 0) return; // count_ can legitimately drop below 0
- if (count_.CompareAndSwap(value, 0)) {
- promise_.Set(true);
- return;
- }
- }
+ void Notify() {
+ if (count_.Add(-1) == 0) promise_.Set(true);
}
/// Blocks until all notifications are received.
@@ -59,8 +44,6 @@ class CountingBarrier {
/// case '*timed_out' will be true.
void Wait(int64_t timeout_ms, bool* timed_out) { promise_.Get(timeout_ms, timed_out); }
- int32_t pending() const { return count_.Load(); }
-
private:
/// Used to signal waiters when all notifications are received.
Promise<bool> promise_;