You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/25 07:57:24 UTC

[impala] 10/14: IMPALA-7205: Respond to ReportExecStatus() RPC with CANCELLED if query execution has terminated

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e5a243e165baa92ae716f1535aaaf4fba5347aad
Author: Dan Hecht <dh...@cloudera.com>
AuthorDate: Mon Jun 25 12:05:16 2018 -0700

    IMPALA-7205: Respond to ReportExecStatus() RPC with CANCELLED if query execution has terminated
    
    Otherwise, if the coordinator to backend CancelFInstances() RPC had failed,
    the query can hang (and/or finstances can continue running until the
    query is closed.
    
    Testing:
    - the modified test reproduces the hang without the impalad fix
    
    Change-Id: I7bb2c26edace89853f14a329f891d1f9a065a991
    Reviewed-on: http://gerrit.cloudera.org:8080/10815
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc | 50 ++++++++++++++++-------------
 be/src/runtime/coordinator-backend-state.h  | 10 ++++--
 be/src/runtime/coordinator.cc               |  9 +++---
 tests/query_test/test_cancellation.py       | 20 ++++++++----
 4 files changed, 51 insertions(+), 38 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index c871a48..a99acdb 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -40,8 +40,8 @@ using namespace rapidjson;
 namespace accumulators = boost::accumulators;
 
 Coordinator::BackendState::BackendState(
-    const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
-  : query_id_(query_id),
+    const Coordinator& coord, int state_idx, TRuntimeFilterMode::type filter_mode)
+  : coord_(coord),
     state_idx_(state_idx),
     filter_mode_(filter_mode) {
 }
@@ -150,16 +150,16 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
 }
 
 void Coordinator::BackendState::Exec(
-    const TQueryCtx& query_ctx, const DebugOptions& debug_options,
+    const DebugOptions& debug_options,
     const FilterRoutingTable& filter_routing_table,
     CountingBarrier* exec_complete_barrier) {
   NotifyBarrierOnExit notifier(exec_complete_barrier);
   TExecQueryFInstancesParams rpc_params;
-  rpc_params.__set_query_ctx(query_ctx);
+  rpc_params.__set_query_ctx(query_ctx());
   SetRpcParams(debug_options, filter_routing_table, &rpc_params);
   VLOG_FILE << "making rpc: ExecQueryFInstances"
       << " host=" << TNetworkAddressToString(impalad_address()) << " query_id="
-      << PrintId(query_id_);
+      << PrintId(query_id());
 
   // guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns
   lock_guard<mutex> l(lock_);
@@ -180,7 +180,7 @@ void Coordinator::BackendState::Exec(
 
   if (!rpc_status.ok()) {
     const string& err_msg =
-        Substitute(ERR_TEMPLATE, PrintId(query_id_), rpc_status.msg().msg());
+        Substitute(ERR_TEMPLATE, PrintId(query_id()), rpc_status.msg().msg());
     VLOG_QUERY << err_msg;
     status_ = Status::Expected(err_msg);
     return;
@@ -188,7 +188,7 @@ void Coordinator::BackendState::Exec(
 
   Status exec_status = Status(thrift_result.status);
   if (!exec_status.ok()) {
-    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id_),
+    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
         exec_status.msg().GetFullMessageDetails());
     VLOG_QUERY << err_msg;
     status_ = Status::Expected(err_msg);
@@ -196,7 +196,7 @@ void Coordinator::BackendState::Exec(
   }
 
   for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start();
-  VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
+  VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id());
 }
 
 Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
@@ -225,7 +225,7 @@ void Coordinator::BackendState::LogFirstInProgress(
   for (Coordinator::BackendState* backend_state : backend_states) {
     lock_guard<mutex> l(backend_state->lock_);
     if (!backend_state->IsDone()) {
-      VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id_)
+      VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id())
                  << ": first in-progress backend: "
                  << TNetworkAddressToString(backend_state->impalad_address());
       break;
@@ -351,9 +351,9 @@ bool Coordinator::BackendState::Cancel() {
 
   TCancelQueryFInstancesParams params;
   params.protocol_version = ImpalaInternalServiceVersion::V1;
-  params.__set_query_id(query_id_);
+  params.__set_query_id(query_id());
   TCancelQueryFInstancesResult dummy;
-  VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id_) <<
+  VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id()) <<
       " backend=" << TNetworkAddressToString(impalad_address());
 
   Status rpc_status;
@@ -362,26 +362,30 @@ bool Coordinator::BackendState::Cancel() {
   for (int i = 0; i < 3; ++i) {
     ImpalaBackendConnection backend_client(ExecEnv::GetInstance()->impalad_client_cache(),
         impalad_address(), &client_status);
-    if (client_status.ok()) {
-      // The return value 'dummy' is ignored as it's only set if the fragment instance
-      // cannot be found in the backend. The fragment instances of a query can all be
-      // cancelled locally in a backend due to RPC failure to coordinator. In which case,
-      // the query state can be gone already.
-      rpc_status = backend_client.DoRpc(
-          &ImpalaBackendClient::CancelQueryFInstances, params, &dummy);
-      if (rpc_status.ok()) break;
-    }
+    if (!client_status.ok()) continue;
+
+    rpc_status = DebugAction(query_ctx().client_request.query_options,
+        "COORD_CANCEL_QUERY_FINSTANCES_RPC");
+    if (!rpc_status.ok()) continue;
+
+    // The return value 'dummy' is ignored as it's only set if the fragment
+    // instance cannot be found in the backend. The fragment instances of a query
+    // can all be cancelled locally in a backend due to RPC failure to
+    // coordinator. In which case, the query state can be gone already.
+    rpc_status = backend_client.DoRpc(
+        &ImpalaBackendClient::CancelQueryFInstances, params, &dummy);
+    if (rpc_status.ok()) break;
   }
   if (!client_status.ok()) {
     status_.MergeStatus(client_status);
-    VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id_)
+    VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
                << " failed to connect to " << TNetworkAddressToString(impalad_address())
                << " :" << client_status.msg().msg();
     return true;
   }
   if (!rpc_status.ok()) {
     status_.MergeStatus(rpc_status);
-    VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id_)
+    VLOG_QUERY << "CancelQueryFInstances query_id= " << PrintId(query_id())
                << " rpc to " << TNetworkAddressToString(impalad_address())
                << " failed: " << rpc_status.msg().msg();
     return true;
@@ -390,7 +394,7 @@ bool Coordinator::BackendState::Cancel() {
 }
 
 void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
-  DCHECK(rpc_params.dst_query_id == query_id_);
+  DCHECK(rpc_params.dst_query_id == query_id());
   {
     // If the backend is already done, it's not waiting for this filter, so we skip
     // sending it in this case.
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index e7af2e2..c51c16c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -54,7 +54,7 @@ struct FInstanceExecParams;
 /// Thread-safe unless pointed out otherwise.
 class Coordinator::BackendState {
  public:
-  BackendState(const TUniqueId& query_id, int state_idx,
+  BackendState(const Coordinator& coord, int state_idx,
       TRuntimeFilterMode::type filter_mode);
 
   /// Creates InstanceStats for all instance in backend_exec_params in obj_pool
@@ -70,7 +70,7 @@ class Coordinator::BackendState {
   /// that weren't selected during its construction.
   /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
   /// on their node_id/instance_idx.
-  void Exec(const TQueryCtx& query_ctx, const DebugOptions& debug_options,
+  void Exec(const DebugOptions& debug_options,
       const FilterRoutingTable& filter_routing_table,
       CountingBarrier* rpc_complete_barrier);
 
@@ -202,7 +202,8 @@ class Coordinator::BackendState {
     void InitCounters();
   };
 
-  const TUniqueId query_id_;
+  const Coordinator& coord_; /// Coordinator object that owns this BackendState
+
   const int state_idx_;  /// index of 'this' in Coordinator::backend_states_
   const TRuntimeFilterMode::type filter_mode_;
 
@@ -256,6 +257,9 @@ class Coordinator::BackendState {
   /// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
   int64_t last_report_time_ms_ = 0;
 
+  const TQueryCtx& query_ctx() const { return coord_.query_ctx(); }
+  const TUniqueId& query_id() const { return coord_.query_id(); }
+
   /// Fill in rpc_params based on state. Uses filter_routing_table to remove filters
   /// that weren't selected during its construction.
   void SetRpcParams(const DebugOptions& debug_options,
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3489312..a240e85 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -205,7 +205,7 @@ void Coordinator::InitBackendStates() {
   int backend_idx = 0;
   for (const auto& entry: schedule_.per_backend_exec_params()) {
     BackendState* backend_state = obj_pool()->Add(
-        new BackendState(query_id(), backend_idx, filter_mode_));
+        new BackendState(*this, backend_idx, filter_mode_));
     backend_state->Init(entry.second, fragment_stats_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
@@ -335,7 +335,7 @@ void Coordinator::StartBackendExec() {
     ExecEnv::GetInstance()->exec_rpc_thread_pool()->Offer(
         [backend_state, this, &debug_options]() {
           DebugActionNoFail(schedule_.query_options(), "COORD_BEFORE_EXEC_RPC");
-          backend_state->Exec(query_ctx(), debug_options, filter_routing_table_,
+          backend_state->Exec(debug_options, filter_routing_table_,
               exec_rpcs_complete_barrier_.get());
         });
   }
@@ -708,10 +708,9 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
     // We've applied all changes from the final status report - notify waiting threads.
     backend_exec_complete_barrier_->Notify();
   }
-  // If all results have been returned, return a cancelled status to force the fragment
+  // If query execution has terminated, return a cancelled status to force the fragment
   // instance to stop executing.
-  // TODO: Make returning CANCELLED unnecessary with IMPALA-6984.
-  return ReturnedAllResults() ? Status::CANCELLED : Status::OK();
+  return exec_state_.Load() == ExecState::EXECUTING ? Status::OK() : Status::CANCELLED;
 }
 
 // TODO: add histogram/percentile
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index 4bcbff0..31d51e2 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -47,8 +47,11 @@ CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
 # Number of times to execute/cancel each query under test
 NUM_CANCELATION_ITERATIONS = 1
 
-# Test cancellation on both running and hung queries
-DEBUG_ACTIONS = [None, 'WAIT']
+# Test cancellation on both running and hung queries. Node ID 0 is the scan node
+WAIT_ACTIONS = [None, '0:GETNEXT:WAIT']
+
+# Verify that failed CancelFInstances() RPCs don't lead to hung queries
+FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL']
 
 # Verify close rpc running concurrently with fetch rpc. The two cases verify:
 # False: close and fetch rpc run concurrently.
@@ -75,7 +78,9 @@ class TestCancellation(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('action', *DEBUG_ACTIONS))
+        ImpalaTestDimension('wait_action', *WAIT_ACTIONS))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('fail_rpc_action', *FAIL_RPC_ACTIONS))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE))
     cls.ImpalaTestMatrix.add_dimension(
@@ -125,9 +130,10 @@ class TestCancellation(ImpalaTestSuite):
             (file_format, query)
 
     join_before_close = vector.get_value('join_before_close')
-    action = vector.get_value('action')
-    # node ID 0 is the scan node
-    debug_action = '0:GETNEXT:' + action if action != None else ''
+    wait_action = vector.get_value('wait_action')
+    fail_rpc_action = vector.get_value('fail_rpc_action')
+
+    debug_action = "|".join(filter(None, [wait_action, fail_rpc_action]))
     vector.get_value('exec_option')['debug_action'] = debug_action
 
     vector.get_value('exec_option')['buffer_pool_limit'] =\
@@ -194,7 +200,7 @@ class TestCancellation(ImpalaTestSuite):
 
     # Executing the same query without canceling should work fine. Only do this if the
     # query has a limit or aggregation
-    if action is None and ('count' in query or 'limit' in query):
+    if not debug_action and ('count' in query or 'limit' in query):
       self.execute_query(query, vector.get_value('exec_option'))
 
   def teardown_method(self, method):