You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/07/21 16:22:07 UTC

[impala] 01/03: IMPALA-6788: Abort ExecFInstance() RPC loop early after query failure

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6cdc8b5ce7fd50bbb54d846594262ba766b22bed
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Mon Jul 13 12:24:19 2020 -0700

    IMPALA-6788: Abort ExecFInstance() RPC loop early after query failure
    
    Stops issuing ExecQueryFInstance rpcs and cancels any inflight when
    backend reports failure.
    Adds new debug action CONSTRUCT_QUERY_STATE_REPORT that runs when
    constructing a query state report.
    Adds a new test case for handling errors reported from query state.
    
    Testing:
     - Ran following command for new test case and verified that the code
       working as expected:
         ./bin/impala-py.test tests/custom_cluster/test_rpc_exception.py\
           ::TestRPCException::test_state_report_error \
           --workload_exploration_strategy=functional-query:exhaustive
     - Passed exhaustive tests.
    
    Change-Id: I034788f7720fc97c25c54f006ff72dce6cb199c3
    Reviewed-on: http://gerrit.cloudera.org:8080/16192
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc              |  9 ++++++---
 be/src/runtime/query-state.cc              |  5 +++++
 tests/custom_cluster/test_rpc_exception.py | 29 +++++++++++++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3551849..b57d66f 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -946,9 +946,12 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
     if (!status.ok()) {
       // We may start receiving status reports before all exec rpcs are complete.
       // Can't apply state transition until no more exec rpcs will be sent.
-      // TODO(IMPALA-6788): we should stop issuing ExecQueryFInstance rpcs and cancel any
-      // inflight when this happens.
-      WaitOnExecRpcs();
+      // We should stop issuing ExecQueryFInstance rpcs and cancel any inflight
+      // when this happens.
+      if (!exec_rpcs_complete_.Load()) {
+        if (!status.IsCancelled()) exec_rpcs_status_barrier_.NotifyRemaining(status);
+        WaitOnExecRpcs();
+      }
 
       // Transition the status if we're not already in a terminal state. This won't block
       // because either this transitions to an ERROR state or the query is already in
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 35d8c1f..742d33f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -427,6 +427,11 @@ void QueryState::ConstructReport(bool instances_started,
   report->set_coord_state_idx(exec_rpc_params_.coord_state_idx());
   {
     unique_lock<SpinLock> l(status_lock_);
+
+    Status debug_action_status =
+        DebugAction(query_options(), "CONSTRUCT_QUERY_STATE_REPORT");
+    if (UNLIKELY(!debug_action_status.ok())) overall_status_ = debug_action_status;
+
     overall_status_.ToProto(report->mutable_overall_status());
     if (IsValidFInstanceId(failed_finstance_id_)) {
       TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id());
diff --git a/tests/custom_cluster/test_rpc_exception.py b/tests/custom_cluster/test_rpc_exception.py
index d4701c5..4921ca9 100644
--- a/tests/custom_cluster/test_rpc_exception.py
+++ b/tests/custom_cluster/test_rpc_exception.py
@@ -157,3 +157,32 @@ class TestRPCException(CustomClusterTestSuite):
     elapsed_s = time.time() - start_s
     assert elapsed_s < 100, "Query took longer than expected to fail: %ss" % elapsed_s
     self.client.set_configuration_option("DEBUG_ACTION", "")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--debug_actions=" +
+      _get_rpc_debug_action(rpc=EXEC_RPC, action="SLEEP@1000000"))
+  def test_state_report_error(self):
+    """Test that verifies that when one backend reports failure, the other Exec() rpcs
+    are immediately cancelled and the query returns an error quickly."""
+    # Debug action to cause executer to construct a state report with failure. When
+    # the state report is processed by the coordinator, it sends a signal to stop
+    # issuing ExecQueryFInstance rpcs and cancel any inflight.
+    # The Exec() rpc to one of the impalads will sleep for a long time before hitting
+    # this (due to the impalad debug_actions startup flag specified above), so one
+    # Exec() will fail quickly while other one will fail only after a long wait.
+    self.client.set_configuration_option("DEBUG_ACTION",
+        "CONSTRUCT_QUERY_STATE_REPORT:FAIL")
+
+    start_s = time.time()
+    try:
+      self.client.execute(self.TEST_QUERY)
+      assert False, "query was expected to fail"
+    except ImpalaBeeswaxException as e:
+      assert "Debug Action: CONSTRUCT_QUERY_STATE_REPORT:FAIL" in str(e)
+
+    # If we successfully cancelled all Exec() rpcs and returned to the client as soon as
+    # the fast Exec() report failure, the time to run the query should be much less than
+    # the sleep time for the slow Exec() of 1000s.
+    elapsed_s = time.time() - start_s
+    assert elapsed_s < 100, "Query took longer than expected to fail: %ss" % elapsed_s
+    self.client.set_configuration_option("DEBUG_ACTION", "")