You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/25 07:57:24 UTC
[impala] 10/14: IMPALA-7205: Respond to ReportExecStatus() RPC with
CANCELLED if query execution has terminated
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git
commit e5a243e165baa92ae716f1535aaaf4fba5347aad
Author: Dan Hecht <dh...@cloudera.com>
AuthorDate: Mon Jun 25 12:05:16 2018 -0700
IMPALA-7205: Respond to ReportExecStatus() RPC with CANCELLED if query execution has terminated
Otherwise, if the coordinator to backend CancelFInstances() RPC had failed,
the query can hang (and/or finstances can continue running until the
query is closed.
Testing:
- the modified test reproduces the hang without the impalad fix
Change-Id: I7bb2c26edace89853f14a329f891d1f9a065a991
Reviewed-on: http://gerrit.cloudera.org:8080/10815
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/coordinator-backend-state.cc | 50 ++++++++++++++++-------------
be/src/runtime/coordinator-backend-state.h | 10 ++++--
be/src/runtime/coordinator.cc | 9 +++---
tests/query_test/test_cancellation.py | 20 ++++++++----
4 files changed, 51 insertions(+), 38 deletions(-)
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index c871a48..a99acdb 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -40,8 +40,8 @@ using namespace rapidjson;
namespace accumulators = boost::accumulators;
Coordinator::BackendState::BackendState(
- const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
- : query_id_(query_id),
+ const Coordinator& coord, int state_idx, TRuntimeFilterMode::type filter_mode)
+ : coord_(coord),
state_idx_(state_idx),
filter_mode_(filter_mode) {
}
@@ -150,16 +150,16 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
}
void Coordinator::BackendState::Exec(
- const TQueryCtx& query_ctx, const DebugOptions& debug_options,
+ const DebugOptions& debug_options,
const FilterRoutingTable& filter_routing_table,
CountingBarrier* exec_complete_barrier) {
NotifyBarrierOnExit notifier(exec_complete_barrier);
TExecQueryFInstancesParams rpc_params;
- rpc_params.__set_query_ctx(query_ctx);
+ rpc_params.__set_query_ctx(query_ctx());
SetRpcParams(debug_options, filter_routing_table, &rpc_params);
VLOG_FILE << "making rpc: ExecQueryFInstances"
<< " host=" << TNetworkAddressToString(impalad_address()) << " query_id="
- << PrintId(query_id_);
+ << PrintId(query_id());
// guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns
lock_guard<mutex> l(lock_);
@@ -180,7 +180,7 @@ void Coordinator::BackendState::Exec(
if (!rpc_status.ok()) {
const string& err_msg =
- Substitute(ERR_TEMPLATE, PrintId(query_id_), rpc_status.msg().msg());
+ Substitute(ERR_TEMPLATE, PrintId(query_id()), rpc_status.msg().msg());
VLOG_QUERY << err_msg;
status_ = Status::Expected(err_msg);
return;
@@ -188,7 +188,7 @@ void Coordinator::BackendState::Exec(
Status exec_status = Status(thrift_result.status);
if (!exec_status.ok()) {
- const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id_),
+ const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
exec_status.msg().GetFullMessageDetails());
VLOG_QUERY << err_msg;
status_ = Status::Expected(err_msg);
@@ -196,7 +196,7 @@ void Coordinator::BackendState::Exec(
}
for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start();
- VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
+ VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id());
}
Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
@@ -225,7 +225,7 @@ void Coordinator::BackendState::LogFirstInProgress(
for (Coordinator::BackendState* backend_state : backend_states) {
lock_guard<mutex> l(backend_state->lock_);
if (!backend_state->IsDone()) {
- VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id_)
+ VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id())
<< ": first in-progress backend: "
<< TNetworkAddressToString(backend_state->impalad_address());
break;
@@ -351,9 +351,9 @@ bool Coordinator::BackendState::Cancel() {
TCancelQueryFInstancesParams params;
params.protocol_version = ImpalaInternalServiceVersion::V1;
- params.__set_query_id(query_id_);
+ params.__set_query_id(query_id());
TCancelQueryFInstancesResult dummy;
- VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id_) <<
+ VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id()) <<
" backend=" << TNetworkAddressToString(impalad_address());
Status rpc_status;
@@ -362,26 +362,30 @@ bool Coordinator::BackendState::Cancel() {
for (int i = 0; i < 3; ++i) {
ImpalaBackendConnection backend_client(ExecEnv::GetInstance()->impalad_client_cache(),
impalad_address(), &client_status);
- if (client_status.ok()) {
- // The return value 'dummy' is ignored as it's only set if the fragment instance
- // cannot be found in the backend. The fragment instances of a query can all be
- // cancelled locally in a backend due to RPC failure to coordinator. In which case,
- // the query state can be gone already.
- rpc_status = backend_client.DoRpc(
- &ImpalaBackendClient::CancelQueryFInstances, params, &dummy);
- if (rpc_status.ok()) break;
- }
+ if (!client_status.ok()) continue;
+
+ rpc_status = DebugAction(query_ctx().client_request.query_options,
+ "COORD_CANCEL_QUERY_FINSTANCES_RPC");
+ if (!rpc_status.ok()) continue;
+
+ // The return value 'dummy' is ignored as it's only set if the fragment
+ // instance cannot be found in the backend. The fragment instances of a query
+ // can all be cancelled locally in a backend due to RPC failure to
+ // coordinator. In which case, the query state can be gone already.
+ rpc_status = backend_client.DoRpc(
+ &ImpalaBackendClient::CancelQueryFInstances, params, &dummy);
+ if (rpc_status.ok()) break;
}
if (!client_status.ok()) {
status_.MergeStatus(client_status);
- VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id_)
+ VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
<< " failed to connect to " << TNetworkAddressToString(impalad_address())
<< " :" << client_status.msg().msg();
return true;
}
if (!rpc_status.ok()) {
status_.MergeStatus(rpc_status);
- VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id_)
+ VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
<< " rpc to " << TNetworkAddressToString(impalad_address())
<< " failed: " << rpc_status.msg().msg();
return true;
@@ -390,7 +394,7 @@ bool Coordinator::BackendState::Cancel() {
}
void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
- DCHECK(rpc_params.dst_query_id == query_id_);
+ DCHECK(rpc_params.dst_query_id == query_id());
{
// If the backend is already done, it's not waiting for this filter, so we skip
// sending it in this case.
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index e7af2e2..c51c16c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -54,7 +54,7 @@ struct FInstanceExecParams;
/// Thread-safe unless pointed out otherwise.
class Coordinator::BackendState {
public:
- BackendState(const TUniqueId& query_id, int state_idx,
+ BackendState(const Coordinator& coord, int state_idx,
TRuntimeFilterMode::type filter_mode);
/// Creates InstanceStats for all instance in backend_exec_params in obj_pool
@@ -70,7 +70,7 @@ class Coordinator::BackendState {
/// that weren't selected during its construction.
/// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
/// on their node_id/instance_idx.
- void Exec(const TQueryCtx& query_ctx, const DebugOptions& debug_options,
+ void Exec(const DebugOptions& debug_options,
const FilterRoutingTable& filter_routing_table,
CountingBarrier* rpc_complete_barrier);
@@ -202,7 +202,8 @@ class Coordinator::BackendState {
void InitCounters();
};
- const TUniqueId query_id_;
+ const Coordinator& coord_; /// Coordinator object that owns this BackendState
+
const int state_idx_; /// index of 'this' in Coordinator::backend_states_
const TRuntimeFilterMode::type filter_mode_;
@@ -256,6 +257,9 @@ class Coordinator::BackendState {
/// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
int64_t last_report_time_ms_ = 0;
+ const TQueryCtx& query_ctx() const { return coord_.query_ctx(); }
+ const TUniqueId& query_id() const { return coord_.query_id(); }
+
/// Fill in rpc_params based on state. Uses filter_routing_table to remove filters
/// that weren't selected during its construction.
void SetRpcParams(const DebugOptions& debug_options,
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3489312..a240e85 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -205,7 +205,7 @@ void Coordinator::InitBackendStates() {
int backend_idx = 0;
for (const auto& entry: schedule_.per_backend_exec_params()) {
BackendState* backend_state = obj_pool()->Add(
- new BackendState(query_id(), backend_idx, filter_mode_));
+ new BackendState(*this, backend_idx, filter_mode_));
backend_state->Init(entry.second, fragment_stats_, obj_pool());
backend_states_[backend_idx++] = backend_state;
}
@@ -335,7 +335,7 @@ void Coordinator::StartBackendExec() {
ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
[backend_state, this, &debug_options]() {
DebugActionNoFail(schedule_.query_options(), "COORD_BEFORE_EXEC_RPC");
- backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
+ backend_state->Exec(debug_options, filter_routing_table_,
exec_rpcs_complete_barrier_.get());
});
}
@@ -708,10 +708,9 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
// We've applied all changes from the final status report - notify waiting threads.
backend_exec_complete_barrier_->Notify();
}
- // If all results have been returned, return a cancelled status to force the fragment
+ // If query execution has terminated, return a cancelled status to force the fragment
// instance to stop executing.
- // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
- return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
+ return exec_state_.Load() == ExecState::EXECUTING ? Status::OK() : Status::CANCELLED;
}
// TODO: add histogram/percentile
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index 4bcbff0..31d51e2 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -47,8 +47,11 @@ CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
# Number of times to execute/cancel each query under test
NUM_CANCELATION_ITERATIONS = 1
-# Test cancellation on both running and hung queries
-DEBUG_ACTIONS = [None, 'WAIT']
+# Test cancellation on both running and hung queries. Node ID 0 is the scan node
+WAIT_ACTIONS = [None, '0:GETNEXT:WAIT']
+
+# Verify that failed CancelFInstances() RPCs don't lead to hung queries
+FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL']
# Verify close rpc running concurrently with fetch rpc. The two cases verify:
# False: close and fetch rpc run concurrently.
@@ -75,7 +78,9 @@ class TestCancellation(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS))
cls.ImpalaTestMatrix.add_dimension(
- ImpalaTestDimension('action', *DEBUG_ACTIONS))
+ ImpalaTestDimension('wait_action', *WAIT_ACTIONS))
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('fail_rpc_action', *FAIL_RPC_ACTIONS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE))
cls.ImpalaTestMatrix.add_dimension(
@@ -125,9 +130,10 @@ class TestCancellation(ImpalaTestSuite):
(file_format, query)
join_before_close = vector.get_value('join_before_close')
- action = vector.get_value('action')
- # node ID 0 is the scan node
- debug_action = '0:GETNEXT:' + action if action != None else ''
+ wait_action = vector.get_value('wait_action')
+ fail_rpc_action = vector.get_value('fail_rpc_action')
+
+ debug_action = "|".join(filter(None, [wait_action, fail_rpc_action]))
vector.get_value('exec_option')['debug_action'] = debug_action
vector.get_value('exec_option')['buffer_pool_limit'] =\
@@ -194,7 +200,7 @@ class TestCancellation(ImpalaTestSuite):
# Executing the same query without canceling should work fine. Only do this if the
# query has a limit or aggregation
- if action is None and ('count' in query or 'limit' in query):
+ if not debug_action and ('count' in query or 'limit' in query):
self.execute_query(query, vector.get_value('exec_option'))
def teardown_method(self, method):