You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/02 00:42:18 UTC

[impala] 01/02: IMPALA-5746: Cancel all queries scheduled by failed coordinators

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9d43cfdaeeb1e0a88af3b7aefdc28aa585927a03
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Sat Jul 18 00:10:18 2020 -0700

    IMPALA-5746: Cancel all queries scheduled by failed coordinators
    
    Executor registers the updating of cluster membership. When coordinators
    are absence from the active cluster membership list, executer cancels
    all the running fragments of the queries which are scheduled by the
    inactive coordinators since the executer cannot send results back to
    the inactive/failed coordinators. This makes executers quickly release
    the resources allocated for those running fragments to be cancelled.
    
    Testing:
    - Added new test case TestProcessFailures::test_kill_coordinator
      and ran the test case as following command:
        ./bin/impala-py.test tests/custom_cluster/test_process_failures.py\
          ::TestProcessFailures::test_kill_coordinator \
          --exploration_strategy=exhaustive.
    - Passed the core test.
    
    Change-Id: I918fcc27649d5d2bbe8b6ef47fbd9810ae5f57bd
    Reviewed-on: http://gerrit.cloudera.org:8080/16215
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc   |  1 +
 be/src/runtime/exec-env.cc                    | 29 ++++++++---
 be/src/runtime/query-exec-mgr.cc              | 71 ++++++++++++++++++++++++++-
 be/src/runtime/query-exec-mgr.h               | 50 +++++++++++++++++--
 be/src/runtime/query-state.cc                 |  9 +++-
 be/src/runtime/query-state.h                  | 10 ++++
 be/src/runtime/test-env.cc                    |  2 +
 common/protobuf/control_service.proto         |  3 ++
 tests/custom_cluster/test_process_failures.py | 34 +++++++++++++
 9 files changed, 195 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 5250927..ee6c039 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -110,6 +110,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
     const FilterRoutingTable& filter_routing_table, ExecQueryFInstancesRequestPB* request,
     TExecPlanFragmentInfo* fragment_info) {
   request->set_coord_state_idx(state_idx_);
+  *request->mutable_coord_backend_id() = ExecEnv::GetInstance()->backend_id();
   request->set_min_mem_reservation_bytes(
       backend_exec_params_.min_mem_reservation_bytes());
   request->set_initial_mem_reservation_total_claims(
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 5850ccd..347f2d7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -544,14 +544,27 @@ void ExecEnv::SetImpalaServer(ImpalaServer* server) {
   cluster_membership_mgr_->SetLocalBeDescFn([server]() {
     return server->GetLocalBackendDescriptor();
   });
-  cluster_membership_mgr_->RegisterUpdateCallbackFn(
-      [server](ClusterMembershipMgr::SnapshotPtr snapshot) {
-        std::unordered_set<BackendIdPB> current_backend_set;
-        for (const auto& it : snapshot->current_backends) {
-          current_backend_set.insert(it.second.backend_id());
-        }
-        server->CancelQueriesOnFailedBackends(current_backend_set);
-      });
+  if (FLAGS_is_coordinator) {
+    cluster_membership_mgr_->RegisterUpdateCallbackFn(
+        [server](ClusterMembershipMgr::SnapshotPtr snapshot) {
+          std::unordered_set<BackendIdPB> current_backend_set;
+          for (const auto& it : snapshot->current_backends) {
+            current_backend_set.insert(it.second.backend_id());
+          }
+          server->CancelQueriesOnFailedBackends(current_backend_set);
+        });
+  }
+  if (FLAGS_is_executor) {
+    cluster_membership_mgr_->RegisterUpdateCallbackFn(
+        [](ClusterMembershipMgr::SnapshotPtr snapshot) {
+          std::unordered_set<BackendIdPB> current_backend_set;
+          for (const auto& it : snapshot->current_backends) {
+            current_backend_set.insert(it.second.backend_id());
+          }
+          ExecEnv::GetInstance()->query_exec_mgr()->CancelQueriesForFailedCoordinators(
+              current_backend_set);
+        });
+  }
 }
 
 void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 823bd59..ac07085 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -39,7 +39,7 @@
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
-#include "util/thread.h"
+#include "util/thread-pool.h"
 
 #include "common/names.h"
 
@@ -49,6 +49,26 @@ using namespace impala;
 DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
     "every log_mem_usage_interval'th fragment completion.");
 
+DEFINE_int32(query_exec_mgr_cancellation_thread_pool_size, 1,
+    "(Advanced) Size of the QueryExecMgr thread-pool processing cancellations due to "
+    "coordinator failure");
+
+const uint32_t QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE = 65536;
+
+QueryExecMgr::QueryExecMgr() {
+  // Initialise the cancellation thread pool with 1 thread (by default). The max queue
+  // size is deliberately set so high that it should never fill; if it does we fill the
+  // queue up to the maximum limit and ignore the rest. The ignored queries will get
+  // cancelled when they time out trying to send status reports.
+  cancellation_thread_pool_.reset(new ThreadPool<QueryCancellationTask>("query-exec-mgr",
+      "cancellation-worker", FLAGS_query_exec_mgr_cancellation_thread_pool_size,
+      QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE,
+      bind<void>(&QueryExecMgr::CancelFromThreadPool, this, _2)));
+  ABORT_IF_ERROR(cancellation_thread_pool_->Init());
+}
+
+QueryExecMgr::~QueryExecMgr() {}
+
 Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
     const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
   TUniqueId query_id = query_ctx.query_id;
@@ -194,3 +214,52 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
   // decrement it after we're completely done with the query.
   ImpaladMetrics::BACKEND_NUM_QUERIES_EXECUTING->Increment(-1);
 }
+
+void QueryExecMgr::AcquireQueryStateLocked(QueryState* qs) {
+  if (qs == nullptr) return;
+  int refcnt = qs->refcnt_.Add(1);
+  DCHECK(refcnt > 0);
+}
+
+void QueryExecMgr::CancelQueriesForFailedCoordinators(
+    const std::unordered_set<BackendIdPB>& current_membership) {
+  // Build a list of queries that are scheduled by failed coordinators (as
+  // evidenced by their absence from the cluster membership list).
+  std::vector<QueryCancellationTask> to_cancel;
+  qs_map_.DoFuncForAllEntries([&](QueryState* qs) {
+    if (qs != nullptr && !qs->IsCancelled()) {
+      if (current_membership.find(qs->coord_backend_id()) == current_membership.end()) {
+        // decremented by ReleaseQueryState()
+        AcquireQueryStateLocked(qs);
+        to_cancel.push_back(QueryCancellationTask(qs));
+      }
+    }
+  });
+
+  // Since we are the only producer for the cancellation thread pool, we can find the
+  // remaining capacity of the pool and submit the new cancellation requests without
+  // blocking.
+  int query_num_to_cancel = to_cancel.size();
+  int remaining_queue_size = QUERY_EXEC_MGR_MAX_CANCELLATION_QUEUE_SIZE
+      - cancellation_thread_pool_->GetQueueSize();
+  if (query_num_to_cancel > remaining_queue_size) {
+    // Fill the queue up to maximum limit, and ignore the rest which will get cancelled
+    // eventually anyways when QueryState::ReportExecStatus() hits the timeout.
+    LOG_EVERY_N(WARNING, 60) << "QueryExecMgr cancellation queue is full";
+    query_num_to_cancel = remaining_queue_size;
+    for (int i = query_num_to_cancel; i < to_cancel.size(); ++i) {
+      ReleaseQueryState(to_cancel[i].GetQueryState());
+    }
+  }
+  for (int i = 0; i < query_num_to_cancel; ++i) {
+    cancellation_thread_pool_->Offer(to_cancel[i]);
+  }
+}
+
+void QueryExecMgr::CancelFromThreadPool(const QueryCancellationTask& cancellation_task) {
+  QueryState* qs = cancellation_task.GetQueryState();
+  VLOG(1) << "CancelFromThreadPool(): cancel query " << qs->query_id();
+  qs->Cancel();
+  qs->is_coord_active_.Store(false);
+  ReleaseQueryState(qs);
+}
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index cc15cc2..2a6ce74 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -17,9 +17,11 @@
 
 #pragma once
 
+#include "common/global-types.h"
 #include "common/status.h"
 #include "util/aligned-new.h"
 #include "util/sharded-query-map-util.h"
+#include "util/thread-pool.h"
 
 namespace impala {
 
@@ -29,12 +31,19 @@ class TExecPlanFragmentInfo;
 class TQueryCtx;
 class TUniqueId;
 
-/// A daemon-wide registry and manager of QueryStates. This is the central
-/// entry point for gaining refcounted access to a QueryState. It also initiates
-/// query execution.
+/// A daemon-wide registry and manager of QueryStates. This is the central entry
+/// point for gaining refcounted access to a QueryState. It initiates query execution.
+/// It also registers a callback function for updating of cluster membership. When
+/// coordinators are absent from the active cluster membership list, it cancels all
+/// the running fragments of the queries scheduled by the inactive coordinators.
+/// Note that we have to hold the shard lock in order to increment the refcnt for a
+/// QueryState safely.
 /// Thread-safe.
 class QueryExecMgr : public CacheLineAligned {
  public:
+  QueryExecMgr();
+  ~QueryExecMgr();
+
   /// Creates QueryState if it doesn't exist and initiates execution of all fragment
   /// instance for this query. All fragment instances hold a reference to their
   /// QueryState for the duration of their execution.
@@ -58,11 +67,37 @@ class QueryExecMgr : public CacheLineAligned {
   /// Decrements the refcount for the given QueryState.
   void ReleaseQueryState(QueryState* qs);
 
+  /// Takes a set of backend ids of active backends and cancels all the running
+  /// fragments of the queries which are scheduled by failed coordinators (that
+  /// is, ids not in the active set).
+  void CancelQueriesForFailedCoordinators(
+      const std::unordered_set<BackendIdPB>& current_membership);
+
+  /// Work item for QueryExecMgr::cancellation_thread_pool_.
+  /// This class needs to support move construction and assignment for use in ThreadPool.
+  class QueryCancellationTask {
+   public:
+    // Empty constructor needed to make ThreadPool happy.
+    QueryCancellationTask() : qs_(nullptr) {}
+    QueryCancellationTask(QueryState* qs) : qs_(qs) {}
+
+    QueryState* GetQueryState() const { return qs_; }
+
+   private:
+    // QueryState to be cancelled.
+    QueryState* qs_;
+  };
+
  private:
 
   typedef ShardedQueryMap<QueryState*> QueryStateMap;
   QueryStateMap qs_map_;
 
+  /// Thread pool to process cancellation tasks for queries scheduled by failed
+  /// coordinators to avoid blocking the statestore callback.
+  /// Set thread pool size as 1 by default since the tasks are local function calls.
+  std::unique_ptr<ThreadPool<QueryCancellationTask>> cancellation_thread_pool_;
+
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.
   /// Increments the refcount.
@@ -73,5 +108,14 @@ class QueryExecMgr : public CacheLineAligned {
   /// Return only after all fragments complete unless an instances hit
   /// an error or the query is cancelled.
   void ExecuteQueryHelper(QueryState* qs);
+
+  /// Increments the refcount for the given QueryState with caller holding the lock
+  /// of the sharded QueryState map.
+  void AcquireQueryStateLocked(QueryState* qs);
+
+  /// Helper method to process cancellations that result from failed coordinators,
+  /// called from the cancellation thread pool. The cancellation_task contains the
+  /// QueryState to be cancelled.
+  void CancelFromThreadPool(const QueryCancellationTask& cancellation_task);
 };
 }
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3cd34fb..3ad9ee0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -210,6 +210,7 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
+  *exec_rpc_params_.mutable_coord_backend_id() = exec_rpc_params->coord_backend_id();
   exec_rpc_params_.mutable_fragment_ctxs()->Swap(
       const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>(
           &exec_rpc_params->fragment_ctxs()));
@@ -386,10 +387,13 @@ void QueryState::UpdateBackendExecState() {
           BackendExecState::EXECUTING : BackendExecState::FINISHED;
     }
   }
-  // Send one last report if the query has reached the terminal state.
+  // Send one last report if the query has reached the terminal state
+  // and the coordinator is active.
   if (IsTerminalState()) {
     VLOG_QUERY << "UpdateBackendExecState(): last report for " << PrintId(query_id());
-    while (!ReportExecStatus()) SleepForMs(GetReportWaitTimeMs());
+    while (is_coord_active_.Load() && !ReportExecStatus()) {
+      SleepForMs(GetReportWaitTimeMs());
+    }
   }
 }
 
@@ -603,6 +607,7 @@ bool QueryState::ReportExecStatus() {
     if (!rpc_status.ok()) {
       LOG(ERROR) << "Cancelling fragment instances due to failure to reach the "
                  << "coordinator. (" << rpc_status.GetDetail() << ").";
+      is_coord_active_.Store(false);
     } else if (!result_status.ok()) {
       // If the ReportExecStatus RPC succeeded in reaching the coordinator and we get
       // back a non-OK status, it means that the coordinator expects us to cancel the
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 352ee64..575eb55 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -152,6 +152,9 @@ class QueryState {
 
   /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
+  const UniqueIdPB& coord_backend_id() const {
+    return exec_rpc_params_.coord_backend_id();
+  }
 
   /// The following getters are only valid after Init() and should be called only from
   /// the backend execution (ie. not the coordinator side, since they require holding
@@ -215,6 +218,9 @@ class QueryState {
   /// instances have finished their Prepare phase. Idempotent.
   void Cancel();
 
+  /// Return true if the executing fragment instances have been cancelled.
+  bool IsCancelled() const { return (is_cancelled_.Load() == 1); }
+
   /// Increment the resource refcount. Must be decremented before the query state
   /// reference is released. A refcount should be held by a fragment or other entity
   /// for as long as it is consuming query backend execution resources (e.g. memory).
@@ -400,6 +406,10 @@ class QueryState {
   /// initiate cancellation exactly once
   AtomicInt32 is_cancelled_;
 
+  /// set to false when the coordinator has been detected as inactive in the cluster;
+  /// used to avoid sending the last execution report to the inactive/failed coordinator.
+  AtomicBool is_coord_active_{true};
+
   /// True if and only if ReleaseExecResources() has been called.
   bool released_backend_resources_ = false;
 
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 92b703d..ddd2a03 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -161,7 +161,9 @@ Status TestEnv::CreateQueryState(
   // param
   ExecQueryFInstancesRequestPB rpc_params;
   // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
+  UniqueIdPB dummy_backend_id;
   rpc_params.set_coord_state_idx(0);
+  *rpc_params.mutable_coord_backend_id() = dummy_backend_id;
   rpc_params.add_fragment_ctxs();
   rpc_params.add_fragment_instance_ctxs();
   TExecPlanFragmentInfo fragment_info;
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 52495b8..8d97243 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -384,6 +384,9 @@ message ExecQueryFInstancesRequestPB {
   // Execution parameters for specific fragment instances. Corresponds to
   // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
   repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
+
+  // The Backend ID of the coordinator.
+  optional UniqueIdPB coord_backend_id = 9;
 }
 
 message ExecQueryFInstancesResponsePB {
diff --git a/tests/custom_cluster/test_process_failures.py b/tests/custom_cluster/test_process_failures.py
index 4fb58a4..4e106ef 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -17,6 +17,7 @@
 
 import pytest
 
+from beeswaxd.BeeswaxService import QueryState
 from tests.common.custom_cluster_test_suite import (
     DEFAULT_CLUSTER_SIZE,
     CustomClusterTestSuite)
@@ -57,6 +58,39 @@ class TestProcessFailures(CustomClusterTestSuite):
     self.execute_query_expect_success(client, QUERY)
 
   @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1,
+      impalad_args="--status_report_max_retry_s=600 --status_report_interval_ms=1000")
+  def test_kill_coordinator(self):
+    """"Tests that when a coordinator running multiple queries is killed, all
+    running fragments on executors are cancelled."""
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    assert client is not None
+    # A query which is cancelable and takes long time to execute
+    query = "select * from tpch.lineitem t1, tpch.lineitem t2, tpch.lineitem t3 " \
+        "where t1.l_orderkey = t2.l_orderkey and t1.l_orderkey = t3.l_orderkey and " \
+        "t3.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey, " \
+        "t3.l_orderkey limit 300"
+    num_concurrent_queries = 3
+    handles = []
+
+    # Run num_concurrent_queries asynchronously
+    for _ in xrange(num_concurrent_queries):
+      handles.append(client.execute_async(query))
+
+    # Wait for the queries to start running
+    for handle in handles:
+      self.wait_for_state(handle, QueryState.RUNNING, 1000, client=client)
+
+    # Kill the coordinator
+    impalad.kill()
+
+    # Assert that all executors have 0 in-flight fragments
+    for i in xrange(1, len(self.cluster.impalads)):
+      self.cluster.impalads[i].service.wait_for_metric_value(
+        "impala-server.num-fragments-in-flight", 0, timeout=30)
+
+  @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args='--use_local_catalog',
       catalogd_args='--catalog_topic_mode=minimal')