You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2022/04/30 05:26:56 UTC
[impala] branch master updated: IMPALA-11263: Coordinator hang when cancelling a query
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 38f68a31e IMPALA-11263: Coordinator hang when cancelling a query
38f68a31e is described below
commit 38f68a31edcd2ff4ef52c47f33b8999082750645
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Wed Apr 13 21:19:51 2022 -0700
IMPALA-11263: Coordinator hang when cancelling a query
In a rare case, callback Coordinator::BackendState::ExecCompleteCb()
is not called for the corresponding ExecQueryFInstances RPC when the
RPC is cancelled. This causes coordinator to wait indefinitely when
calling Coordinator::BackendState::Cancel() to cancel a fragment
instance.
This patch adds timeout for BackendState::WaitOnExecLocked() so that
coordinator will not be blocked indefinitely when cancelling a query.
Testing:
- Added a test case to simulate the callback missing when a query
is failed. Verified that the coordinator would hang without the
fixing, and would not hang with the fixing.
- Passed exhaustive-debug tests.
Change-Id: I915511afe2df3017cbbf37f6aff3c5ff7f5473be
Reviewed-on: http://gerrit.cloudera.org:8080/18439
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/coordinator-backend-state.cc | 200 ++++++++++++++++------------
be/src/runtime/coordinator-backend-state.h | 7 +-
tests/custom_cluster/test_rpc_timeout.py | 33 ++++-
3 files changed, 149 insertions(+), 91 deletions(-)
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 98e0938c3..5b72ae946 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -195,14 +195,22 @@ void Coordinator::BackendState::SetExecError(
void Coordinator::BackendState::WaitOnExecRpc() {
unique_lock<mutex> l(lock_);
- WaitOnExecLocked(&l);
+ discard_result(WaitOnExecLocked(&l));
}
-void Coordinator::BackendState::WaitOnExecLocked(unique_lock<mutex>* l) {
+bool Coordinator::BackendState::WaitOnExecLocked(
+ unique_lock<mutex>* l, int64_t timeout_ms) {
DCHECK(l->owns_lock());
- while (!exec_done_) {
- exec_done_cv_.Wait(*l);
+ bool timed_out = false;
+ while (!exec_done_ && !timed_out) {
+ if (timeout_ms <= 0) {
+ exec_done_cv_.Wait(*l);
+ } else {
+ // WaitFor returns true if notified in time.
+ timed_out = !exec_done_cv_.WaitFor(*l, timeout_ms * MICROS_PER_MILLI);
+ }
}
+ return !timed_out;
}
void Coordinator::BackendState::ExecCompleteCb(
@@ -210,6 +218,15 @@ void Coordinator::BackendState::ExecCompleteCb(
{
lock_guard<mutex> l(lock_);
exec_rpc_status_ = exec_rpc_controller_.status();
+
+ Status complete_cb_debug_status =
+ DebugAction(exec_params_.query_options(), "IMPALA_MISS_EXEC_COMPLETE_CB");
+ if (UNLIKELY(exec_rpc_status_.ok() && !complete_cb_debug_status.ok())) {
+ // Simulate the missing of callback for successful RPC.
+ LOG(ERROR) << "Debug action: missing ExecComplete callback";
+ return;
+ }
+
rpc_latency_ = MonotonicMillis() - start_ms;
if (!exec_rpc_status_.ok()) {
@@ -579,101 +596,108 @@ Coordinator::BackendState::CancelResult Coordinator::BackendState::Cancel(
bool fire_and_forget) {
// Update 'result' based on the actions we take in this function and/or errors we hit.
CancelResult result;
- unique_lock<mutex> l(lock_);
+ bool notify_exec_done = false;
+ {
+ unique_lock<mutex> l(lock_);
- // Nothing to cancel if the exec rpc was not sent.
- if (!exec_rpc_sent_) {
- if (status_.ok()) {
- status_ = Status::CANCELLED;
- result.became_done = true;
+ // Nothing to cancel if the exec rpc was not sent.
+ if (!exec_rpc_sent_) {
+ if (status_.ok()) {
+ status_ = Status::CANCELLED;
+ result.became_done = true;
+ }
+ VLogForBackend("Not sending Cancel() rpc because nothing was started.");
+ exec_done_ = true;
+ notify_exec_done = true;
+ goto done;
}
- VLogForBackend("Not sending Cancel() rpc because nothing was started.");
- exec_done_ = true;
- // Notify after releasing 'lock_' so that we don't wake up a thread just to have it
- // immediately block again.
- l.unlock();
- exec_done_cv_.NotifyAll();
- return result;
- }
- // If the exec rpc was sent but the callback hasn't been executed, try to cancel the rpc
- // and then wait for it to be done.
- if (!exec_done_) {
- VLogForBackend("Attempting to cancel Exec() rpc");
- cancel_exec_rpc_ = true;
- exec_rpc_controller_.Cancel();
- WaitOnExecLocked(&l);
- }
+ // If the exec rpc was sent but the callback hasn't been executed, try to cancel the
+ // rpc and then wait for it to be done.
+ if (!exec_done_) {
+ VLogForBackend("Attempting to cancel Exec() rpc");
+ cancel_exec_rpc_ = true;
+ exec_rpc_controller_.Cancel();
+ if (!WaitOnExecLocked(&l, (int64_t)FLAGS_backend_client_rpc_timeout_ms)) {
+ VLogForBackend(Substitute(
+ "Exec() rpc was not responsive after waiting for $0 ms",
+ FLAGS_backend_client_rpc_timeout_ms));
+ exec_done_ = true;
+ notify_exec_done = true;
+ }
+ }
- // Don't cancel if we're done or already sent an RPC. Note that its possible the
- // backend is still running, eg. if the rpc layer reported that the Exec() rpc failed
- // but it actually reached the backend. In that case, the backend will cancel itself
- // the first time it tries to send a status report and the coordinator responds with
- // an error.
- if (IsDoneLocked(l)) {
- VLogForBackend(Substitute(
- "Not cancelling because the backend is already done: $0", status_.GetDetail()));
- return result;
- } else if (sent_cancel_rpc_) {
- DCHECK(status_.ok());
- // If we did a fire_and_forget=false followed by fire_and_forget=true.
- if (fire_and_forget) {
- status_ = Status::CANCELLED;
+ // Don't cancel if we're done or already sent an RPC. Note that its possible the
+ // backend is still running, eg. if the rpc layer reported that the Exec() rpc failed
+ // but it actually reached the backend. In that case, the backend will cancel itself
+ // the first time it tries to send a status report and the coordinator responds with
+ // an error.
+ if (IsDoneLocked(l)) {
+ VLogForBackend(Substitute(
+ "Not cancelling because the backend is already done: $0", status_.GetDetail()));
+ goto done;
+ } else if (sent_cancel_rpc_) {
+ DCHECK(status_.ok());
+ // If we did a fire_and_forget=false followed by fire_and_forget=true.
+ if (fire_and_forget) {
+ status_ = Status::CANCELLED;
+ result.became_done = true;
+ }
+ VLogForBackend(Substitute(
+ "Not cancelling because cancel RPC already sent: $0", status_.GetDetail()));
+ goto done;
+ }
+
+ // Avoid sending redundant cancel RPCs.
+ sent_cancel_rpc_ = true;
+ result.cancel_attempted = true;
+ // Set the status to CANCELLED if we are firing and forgetting.
+ if (fire_and_forget && status_.ok()) {
result.became_done = true;
+ status_ = Status::CANCELLED;
}
- VLogForBackend(Substitute(
- "Not cancelling because cancel RPC already sent: $0", status_.GetDetail()));
- return result;
- }
- // Avoid sending redundant cancel RPCs.
- sent_cancel_rpc_ = true;
- result.cancel_attempted = true;
- // Set the status to CANCELLED if we are firing and forgetting.
- if (fire_and_forget && status_.ok()) {
- result.became_done = true;
- status_ = Status::CANCELLED;
- }
+ VLogForBackend("Sending CancelQueryFInstances rpc");
+
+ std::unique_ptr<ControlServiceProxy> proxy;
+ Status get_proxy_status = ControlService::GetProxy(
+ FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
+ if (!get_proxy_status.ok()) {
+ status_.MergeStatus(get_proxy_status);
+ result.became_done = true;
+ VLogForBackend(Substitute("Could not get proxy: $0", get_proxy_status.msg().msg()));
+ goto done;
+ }
- VLogForBackend("Sending CancelQueryFInstances rpc");
+ CancelQueryFInstancesRequestPB request;
+ *request.mutable_query_id() = query_id_;
+ CancelQueryFInstancesResponsePB response;
- std::unique_ptr<ControlServiceProxy> proxy;
- Status get_proxy_status = ControlService::GetProxy(
- FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
- if (!get_proxy_status.ok()) {
- status_.MergeStatus(get_proxy_status);
- result.became_done = true;
- VLogForBackend(Substitute("Could not get proxy: $0", get_proxy_status.msg().msg()));
- return result;
- }
+ const int num_retries = 3;
+ const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+ const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+ Status rpc_status =
+ RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances,
+ request, &response, query_ctx_, "Cancel() RPC failed", num_retries,
+ timeout_ms, backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
- CancelQueryFInstancesRequestPB request;
- *request.mutable_query_id() = query_id_;
- CancelQueryFInstancesResponsePB response;
-
- const int num_retries = 3;
- const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
- const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
- Status rpc_status =
- RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances, request,
- &response, query_ctx_, "Cancel() RPC failed", num_retries, timeout_ms,
- backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
-
- if (!rpc_status.ok()) {
- status_.MergeStatus(rpc_status);
- result.became_done = true;
- VLogForBackend(
- Substitute("CancelQueryFInstances rpc failed: $0", rpc_status.msg().msg()));
- return result;
- }
- Status cancel_status = Status(response.status());
- if (!cancel_status.ok()) {
- status_.MergeStatus(cancel_status);
- result.became_done = true;
- VLogForBackend(
- Substitute("CancelQueryFInstances failed: $0", cancel_status.msg().msg()));
- return result;
+ if (!rpc_status.ok()) {
+ status_.MergeStatus(rpc_status);
+ result.became_done = true;
+ VLogForBackend(
+ Substitute("CancelQueryFInstances rpc failed: $0", rpc_status.msg().msg()));
+ goto done;
+ }
+ Status cancel_status = Status(response.status());
+ if (!cancel_status.ok()) {
+ status_.MergeStatus(cancel_status);
+ result.became_done = true;
+ VLogForBackend(
+ Substitute("CancelQueryFInstances failed: $0", cancel_status.msg().msg()));
+ }
}
+done:
+ if (notify_exec_done) exec_done_cv_.NotifyAll();
return result;
}
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index a465d53a1..2d4053c87 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -450,8 +450,11 @@ class Coordinator::BackendState {
void SetExecError(
const Status& status, TypedCountingBarrier<Status>* exec_status_barrier);
- /// Same as WaitOnExecRpc(), except 'l' must own 'lock_'.
- void WaitOnExecLocked(std::unique_lock<std::mutex>* l);
+ /// Waits until the ExecQueryFInstances() rpc has completed, or been timeout.
+ /// 'l' must own 'lock_'. 'timeout_ms' specifies timeout in milli-seconds. Wait
+ /// indefinitely until rpc has completed if 'timeout_ms' is less or equal to 0.
+ /// Return true if the rpc has completed in time, return false otherwise.
+ bool WaitOnExecLocked(std::unique_lock<std::mutex>* l, int64_t timeout_ms = 0);
/// Called when the ExecQueryFInstances() rpc completes. Notifies 'exec_status_barrier'
/// with the status. 'start' is the MonotonicMillis() timestamp when the rpc was sent.
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 797efb2c5..3023de2f7 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -19,9 +19,17 @@ import pytest
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_cluster import ImpalaCluster
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfBuildType, SkipIfGCS, SkipIfCOS
from tests.verifiers.metric_verifier import MetricVerifier
+# The BE krpc port of the impalad to simulate rpc errors in tests.
+FAILED_KRPC_PORT = 27001
+
+
+def _get_rpc_fail_action(port):
+ return "IMPALA_SERVICE_POOL:127.0.0.1:{port}:ExecQueryFInstances:FAIL" \
+ .format(port=port)
+
@SkipIfBuildType.not_dev_build
class TestRPCTimeout(CustomClusterTestSuite):
"""Tests for every Impala RPC timeout handling, query should not hang and
@@ -194,6 +202,29 @@ class TestRPCTimeout(CustomClusterTestSuite):
self.execute_query_verify_metrics(self.SLOW_TEST_QUERY,
expected_exception="cancelled due to unresponsive backend")
+ @SkipIfGCS.jira(reason="IMPALA-10562")
+ @SkipIfCOS.jira(reason="IMPALA-10562")
+ @SkipIfBuildType.not_dev_build
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--backend_client_rpc_timeout_ms=1000 --debug_actions=" +
+ _get_rpc_fail_action(FAILED_KRPC_PORT),
+ statestored_args="--statestore_heartbeat_frequency_ms=1000 \
+ --statestore_max_missed_heartbeats=2")
+ def test_miss_complete_cb(self, unique_database):
+ """Test verify cancellation should not be blocked if the callback of ExecComplate
+ are missing."""
+
+ rpc_not_accessible_impalad = self.cluster.impalads[1]
+ assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+ # The 2nd node cannot be accessible through KRPC so that it's added to blacklist
+ # and the query should be aborted without hanging.
+ query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
+ debug_action = 'IMPALA_MISS_EXEC_COMPLETE_CB:FAIL@1.0'
+ ex = self.execute_query_expect_failure(self.client, query,
+ query_options={'retry_failed_queries': 'false', 'debug_action': debug_action})
+ assert "Query aborted" in str(ex)
class TestCatalogRPCTimeout(CustomClusterTestSuite):
""""Tests RPC timeout and retry handling for catalogd operations."""