You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2022/04/30 05:26:56 UTC

[impala] branch master updated: IMPALA-11263: Coordinator hang when cancelling a query

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 38f68a31e IMPALA-11263: Coordinator hang when cancelling a query
38f68a31e is described below

commit 38f68a31edcd2ff4ef52c47f33b8999082750645
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Wed Apr 13 21:19:51 2022 -0700

    IMPALA-11263: Coordinator hang when cancelling a query
    
    In a rare case, callback Coordinator::BackendState::ExecCompleteCb()
    is not called for the corresponding ExecQueryFInstances RPC when the
    RPC is cancelled. This causes coordinator to wait indefinitely when
    calling Coordinator::BackendState::Cancel() to cancel a fragment
    instance.
    
    This patch adds timeout for BackendState::WaitOnExecLocked() so that
    coordinator will not be blocked indefinitely when cancelling a query.
    
    Testing:
     - Added a test case to simulate the callback missing when a query
       is failed. Verified that the coordinator would hang without the
       fixing, and would not hang with the fixing.
     - Passed exhaustive-debug tests.
    
    Change-Id: I915511afe2df3017cbbf37f6aff3c5ff7f5473be
    Reviewed-on: http://gerrit.cloudera.org:8080/18439
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc | 200 ++++++++++++++++------------
 be/src/runtime/coordinator-backend-state.h  |   7 +-
 tests/custom_cluster/test_rpc_timeout.py    |  33 ++++-
 3 files changed, 149 insertions(+), 91 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 98e0938c3..5b72ae946 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -195,14 +195,22 @@ void Coordinator::BackendState::SetExecError(
 
 void Coordinator::BackendState::WaitOnExecRpc() {
   unique_lock<mutex> l(lock_);
-  WaitOnExecLocked(&l);
+  discard_result(WaitOnExecLocked(&l));
 }
 
-void Coordinator::BackendState::WaitOnExecLocked(unique_lock<mutex>* l) {
+bool Coordinator::BackendState::WaitOnExecLocked(
+    unique_lock<mutex>* l, int64_t timeout_ms) {
   DCHECK(l->owns_lock());
-  while (!exec_done_) {
-    exec_done_cv_.Wait(*l);
+  bool timed_out = false;
+  while (!exec_done_ && !timed_out) {
+    if (timeout_ms <= 0) {
+      exec_done_cv_.Wait(*l);
+    } else {
+      // WaitFor returns true if notified in time.
+      timed_out = !exec_done_cv_.WaitFor(*l, timeout_ms * MICROS_PER_MILLI);
+    }
   }
+  return !timed_out;
 }
 
 void Coordinator::BackendState::ExecCompleteCb(
@@ -210,6 +218,15 @@ void Coordinator::BackendState::ExecCompleteCb(
   {
     lock_guard<mutex> l(lock_);
     exec_rpc_status_ = exec_rpc_controller_.status();
+
+    Status complete_cb_debug_status =
+        DebugAction(exec_params_.query_options(), "IMPALA_MISS_EXEC_COMPLETE_CB");
+    if (UNLIKELY(exec_rpc_status_.ok() && !complete_cb_debug_status.ok())) {
+      // Simulate the missing of callback for successful RPC.
+      LOG(ERROR) << "Debug action: missing ExecComplete callback";
+      return;
+    }
+
     rpc_latency_ = MonotonicMillis() - start_ms;
 
     if (!exec_rpc_status_.ok()) {
@@ -579,101 +596,108 @@ Coordinator::BackendState::CancelResult Coordinator::BackendState::Cancel(
     bool fire_and_forget) {
   // Update 'result' based on the actions we take in this function and/or errors we hit.
   CancelResult result;
-  unique_lock<mutex> l(lock_);
+  bool notify_exec_done = false;
+  {
+    unique_lock<mutex> l(lock_);
 
-  // Nothing to cancel if the exec rpc was not sent.
-  if (!exec_rpc_sent_) {
-    if (status_.ok()) {
-      status_ = Status::CANCELLED;
-      result.became_done = true;
+    // Nothing to cancel if the exec rpc was not sent.
+    if (!exec_rpc_sent_) {
+      if (status_.ok()) {
+        status_ = Status::CANCELLED;
+        result.became_done = true;
+      }
+      VLogForBackend("Not sending Cancel() rpc because nothing was started.");
+      exec_done_ = true;
+      notify_exec_done = true;
+      goto done;
     }
-    VLogForBackend("Not sending Cancel() rpc because nothing was started.");
-    exec_done_ = true;
-    // Notify after releasing 'lock_' so that we don't wake up a thread just to have it
-    // immediately block again.
-    l.unlock();
-    exec_done_cv_.NotifyAll();
-    return result;
-  }
 
-  // If the exec rpc was sent but the callback hasn't been executed, try to cancel the rpc
-  // and then wait for it to be done.
-  if (!exec_done_) {
-    VLogForBackend("Attempting to cancel Exec() rpc");
-    cancel_exec_rpc_ = true;
-    exec_rpc_controller_.Cancel();
-    WaitOnExecLocked(&l);
-  }
+    // If the exec rpc was sent but the callback hasn't been executed, try to cancel the
+    // rpc and then wait for it to be done.
+    if (!exec_done_) {
+      VLogForBackend("Attempting to cancel Exec() rpc");
+      cancel_exec_rpc_ = true;
+      exec_rpc_controller_.Cancel();
+      if (!WaitOnExecLocked(&l, (int64_t)FLAGS_backend_client_rpc_timeout_ms)) {
+        VLogForBackend(Substitute(
+            "Exec() rpc was not responsive after waiting for $0 ms",
+            FLAGS_backend_client_rpc_timeout_ms));
+        exec_done_ = true;
+        notify_exec_done = true;
+      }
+    }
 
-  // Don't cancel if we're done or already sent an RPC. Note that its possible the
-  // backend is still running, eg. if the rpc layer reported that the Exec() rpc failed
-  // but it actually reached the backend. In that case, the backend will cancel itself
-  // the first time it tries to send a status report and the coordinator responds with
-  // an error.
-  if (IsDoneLocked(l)) {
-    VLogForBackend(Substitute(
-        "Not cancelling because the backend is already done: $0", status_.GetDetail()));
-    return result;
-  } else if (sent_cancel_rpc_) {
-    DCHECK(status_.ok());
-    // If we did a fire_and_forget=false followed by fire_and_forget=true.
-    if (fire_and_forget) {
-      status_ = Status::CANCELLED;
+    // Don't cancel if we're done or already sent an RPC. Note that its possible the
+    // backend is still running, eg. if the rpc layer reported that the Exec() rpc failed
+    // but it actually reached the backend. In that case, the backend will cancel itself
+    // the first time it tries to send a status report and the coordinator responds with
+    // an error.
+    if (IsDoneLocked(l)) {
+      VLogForBackend(Substitute(
+          "Not cancelling because the backend is already done: $0", status_.GetDetail()));
+      goto done;
+    } else if (sent_cancel_rpc_) {
+      DCHECK(status_.ok());
+      // If we did a fire_and_forget=false followed by fire_and_forget=true.
+      if (fire_and_forget) {
+        status_ = Status::CANCELLED;
+        result.became_done = true;
+      }
+      VLogForBackend(Substitute(
+          "Not cancelling because cancel RPC already sent: $0", status_.GetDetail()));
+      goto done;
+    }
+
+    // Avoid sending redundant cancel RPCs.
+    sent_cancel_rpc_ = true;
+    result.cancel_attempted = true;
+    // Set the status to CANCELLED if we are firing and forgetting.
+    if (fire_and_forget && status_.ok()) {
       result.became_done = true;
+      status_ = Status::CANCELLED;
     }
-    VLogForBackend(Substitute(
-        "Not cancelling because cancel RPC already sent: $0", status_.GetDetail()));
-    return result;
-  }
 
-  // Avoid sending redundant cancel RPCs.
-  sent_cancel_rpc_ = true;
-  result.cancel_attempted = true;
-  // Set the status to CANCELLED if we are firing and forgetting.
-  if (fire_and_forget && status_.ok()) {
-    result.became_done = true;
-    status_ = Status::CANCELLED;
-  }
+    VLogForBackend("Sending CancelQueryFInstances rpc");
+
+    std::unique_ptr<ControlServiceProxy> proxy;
+    Status get_proxy_status = ControlService::GetProxy(
+        FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
+    if (!get_proxy_status.ok()) {
+      status_.MergeStatus(get_proxy_status);
+      result.became_done = true;
+      VLogForBackend(Substitute("Could not get proxy: $0", get_proxy_status.msg().msg()));
+      goto done;
+    }
 
-  VLogForBackend("Sending CancelQueryFInstances rpc");
+    CancelQueryFInstancesRequestPB request;
+    *request.mutable_query_id() = query_id_;
+    CancelQueryFInstancesResponsePB response;
 
-  std::unique_ptr<ControlServiceProxy> proxy;
-  Status get_proxy_status = ControlService::GetProxy(
-      FromNetworkAddressPB(krpc_host_), host_.hostname(), &proxy);
-  if (!get_proxy_status.ok()) {
-    status_.MergeStatus(get_proxy_status);
-    result.became_done = true;
-    VLogForBackend(Substitute("Could not get proxy: $0", get_proxy_status.msg().msg()));
-    return result;
-  }
+    const int num_retries = 3;
+    const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
+    const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
+    Status rpc_status =
+        RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances,
+            request, &response, query_ctx_, "Cancel() RPC failed", num_retries,
+            timeout_ms, backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
 
-  CancelQueryFInstancesRequestPB request;
-  *request.mutable_query_id() = query_id_;
-  CancelQueryFInstancesResponsePB response;
-
-  const int num_retries = 3;
-  const int64_t timeout_ms = 10 * MILLIS_PER_SEC;
-  const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
-  Status rpc_status =
-      RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances, request,
-          &response, query_ctx_, "Cancel() RPC failed", num_retries, timeout_ms,
-          backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
-
-  if (!rpc_status.ok()) {
-    status_.MergeStatus(rpc_status);
-    result.became_done = true;
-    VLogForBackend(
-        Substitute("CancelQueryFInstances rpc failed: $0", rpc_status.msg().msg()));
-    return result;
-  }
-  Status cancel_status = Status(response.status());
-  if (!cancel_status.ok()) {
-    status_.MergeStatus(cancel_status);
-    result.became_done = true;
-    VLogForBackend(
-        Substitute("CancelQueryFInstances failed: $0", cancel_status.msg().msg()));
-    return result;
+    if (!rpc_status.ok()) {
+      status_.MergeStatus(rpc_status);
+      result.became_done = true;
+      VLogForBackend(
+          Substitute("CancelQueryFInstances rpc failed: $0", rpc_status.msg().msg()));
+      goto done;
+    }
+    Status cancel_status = Status(response.status());
+    if (!cancel_status.ok()) {
+      status_.MergeStatus(cancel_status);
+      result.became_done = true;
+      VLogForBackend(
+          Substitute("CancelQueryFInstances failed: $0", cancel_status.msg().msg()));
+    }
   }
+done:
+  if (notify_exec_done) exec_done_cv_.NotifyAll();
   return result;
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index a465d53a1..2d4053c87 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -450,8 +450,11 @@ class Coordinator::BackendState {
   void SetExecError(
       const Status& status, TypedCountingBarrier<Status>* exec_status_barrier);
 
-  /// Same as WaitOnExecRpc(), except 'l' must own 'lock_'.
-  void WaitOnExecLocked(std::unique_lock<std::mutex>* l);
+  /// Waits until the ExecQueryFInstances() rpc has completed, or been timeout.
+  /// 'l' must own 'lock_'. 'timeout_ms' specifies timeout in milli-seconds. Wait
+  /// indefinitely until rpc has completed if 'timeout_ms' is less or equal to 0.
+  /// Return true if the rpc has completed in time, return false otherwise.
+  bool WaitOnExecLocked(std::unique_lock<std::mutex>* l, int64_t timeout_ms = 0);
 
   /// Called when the ExecQueryFInstances() rpc completes. Notifies 'exec_status_barrier'
   /// with the status. 'start' is the MonotonicMillis() timestamp when the rpc was sent.
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 797efb2c5..3023de2f7 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -19,9 +19,17 @@ import pytest
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_cluster import ImpalaCluster
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfBuildType, SkipIfGCS, SkipIfCOS
 from tests.verifiers.metric_verifier import MetricVerifier
 
+# The BE krpc port of the impalad to simulate rpc errors in tests.
+FAILED_KRPC_PORT = 27001
+
+
+def _get_rpc_fail_action(port):
+  return "IMPALA_SERVICE_POOL:127.0.0.1:{port}:ExecQueryFInstances:FAIL" \
+      .format(port=port)
+
 @SkipIfBuildType.not_dev_build
 class TestRPCTimeout(CustomClusterTestSuite):
   """Tests for every Impala RPC timeout handling, query should not hang and
@@ -194,6 +202,29 @@ class TestRPCTimeout(CustomClusterTestSuite):
     self.execute_query_verify_metrics(self.SLOW_TEST_QUERY,
         expected_exception="cancelled due to unresponsive backend")
 
+  @SkipIfGCS.jira(reason="IMPALA-10562")
+  @SkipIfCOS.jira(reason="IMPALA-10562")
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--backend_client_rpc_timeout_ms=1000 --debug_actions=" +
+      _get_rpc_fail_action(FAILED_KRPC_PORT),
+      statestored_args="--statestore_heartbeat_frequency_ms=1000 \
+          --statestore_max_missed_heartbeats=2")
+  def test_miss_complete_cb(self, unique_database):
+    """Test verify cancellation should not be blocked if the callback of ExecComplate
+    are missing."""
+
+    rpc_not_accessible_impalad = self.cluster.impalads[1]
+    assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    # The 2nd node cannot be accessible through KRPC so that it's added to blacklist
+    # and the query should be aborted without hanging.
+    query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
+    debug_action = 'IMPALA_MISS_EXEC_COMPLETE_CB:FAIL@1.0'
+    ex = self.execute_query_expect_failure(self.client, query,
+        query_options={'retry_failed_queries': 'false', 'debug_action': debug_action})
+    assert "Query aborted" in str(ex)
 
 class TestCatalogRPCTimeout(CustomClusterTestSuite):
   """"Tests RPC timeout and retry handling for catalogd operations."""