You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/04/02 07:54:57 UTC

[impala] 01/05: IMPALA-10594: Handle failed coordinators in admissiond

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9adb093ae056f1bb7fcd14ba138fd23517648226
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Mar 18 10:42:41 2021 -0700

    IMPALA-10594: Handle failed coordinators in admissiond
    
    This patch adds a statestore callback for the admissiond that monitors
    for coordinators that have been removed from the cluster membership
    and releases all of the resources for queries running on those
    coordinators.
    
    Testing:
    - Added a custom cluster test that kills a coordinator and verifies
      that resources for queries running on it are eventually released.
    
    Change-Id: I883f323bb765680ef24b3c3f51fb209dea15f0b0
    Reviewed-on: http://gerrit.cloudera.org:8080/17209
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-control-service.cc    | 16 +++++++
 be/src/scheduling/admission-control-service.h     |  4 ++
 be/src/scheduling/admission-controller.cc         | 58 ++++++++++++++++++++---
 be/src/scheduling/admission-controller.h          | 11 ++++-
 be/src/scheduling/admissiond-env.cc               |  8 ++++
 tests/custom_cluster/test_admission_controller.py | 37 ++++++++++++++-
 6 files changed, 125 insertions(+), 9 deletions(-)

diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc
index 1f49f2f..f00f335 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -289,6 +289,22 @@ void AdmissionControlService::AdmissionHeartbeat(const AdmissionHeartbeatRequest
   RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
 }
 
+void AdmissionControlService::CancelQueriesOnFailedCoordinators(
+    std::unordered_set<UniqueIdPB> current_backends) {
+  std::unordered_map<UniqueIdPB, vector<UniqueIdPB>> cleaned_up =
+      AdmissiondEnv::GetInstance()
+          ->admission_controller()
+          ->CancelQueriesOnFailedCoordinators(current_backends);
+
+  for (const auto& entry : cleaned_up) {
+    for (const UniqueIdPB& query_id : entry.second) {
+      // ShardedQueryMap::Delete will log an error already if anything goes wrong, so just
+      // ignore the return value.
+      discard_result(admission_state_map_.Delete(query_id));
+    }
+  }
+}
+
 void AdmissionControlService::AdmitFromThreadPool(UniqueIdPB query_id) {
   shared_ptr<AdmissionState> admission_state;
   Status s = admission_state_map_.Get(query_id, &admission_state);
diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h
index 93d8bd4..b030769 100644
--- a/be/src/scheduling/admission-control-service.h
+++ b/be/src/scheduling/admission-control-service.h
@@ -70,6 +70,10 @@ class AdmissionControlService : public AdmissionControlServiceIf,
   /// The newly created proxy is returned in 'proxy'. Returns error status on failure.
   static Status GetProxy(std::unique_ptr<AdmissionControlServiceProxy>* proxy);
 
+  /// Relases the resources for any queries currently running on coordinators that do not
+  /// appear in 'current_backends'. Called in response to statestore updates.
+  void CancelQueriesOnFailedCoordinators(std::unordered_set<UniqueIdPB> current_backends);
+
  private:
   friend class ImpalaHttpHandler;
 
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 6bb380d..1d1f544 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1356,9 +1356,12 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id,
     lock_guard<mutex> lock(admission_ctrl_lock_);
     auto host_it = running_queries_.find(coord_id);
     if (host_it == running_queries_.end()) {
-      LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
-                  << " to get resources to release for query " << PrintId(query_id)
-                  << ", may have already been released.";
+      // In the context of the admission control service, this may happen, eg. if a
+      // coordinator is reported as failed by the statestore but a ReleaseQuery rpc from
+      // it is delayed in the network and arrives much later.
+      LOG(WARNING) << "Unable to find host " << PrintId(coord_id)
+                   << " to get resources to release for query " << PrintId(query_id)
+                   << ", may have already been released.";
       return;
     }
     auto it = host_it->second.find(query_id);
@@ -1412,9 +1415,12 @@ void AdmissionController::ReleaseQueryBackendsLocked(const UniqueIdPB& query_id,
     const UniqueIdPB& coord_id, const vector<NetworkAddressPB>& host_addrs) {
   auto host_it = running_queries_.find(coord_id);
   if (host_it == running_queries_.end()) {
-    LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
-                << " to get resources to release backends for query "
-                << PrintId(query_id) << ", may have already been released.";
+    // In the context of the admission control service, this may happen, eg. if a
+    // coordinator is reported as failed by the statestore but a ReleaseQuery rpc from
+    // it is delayed in the network and arrives much later.
+    LOG(WARNING) << "Unable to find host " << PrintId(coord_id)
+                 << " to get resources to release backends for query "
+                 << PrintId(query_id) << ", may have already been released.";
     return;
   }
   auto it = host_it->second.find(query_id);
@@ -1485,6 +1491,46 @@ vector<UniqueIdPB> AdmissionController::CleanupQueriesForHost(
   return to_clean_up;
 }
 
+std::unordered_map<UniqueIdPB, vector<UniqueIdPB>>
+AdmissionController::CancelQueriesOnFailedCoordinators(
+    std::unordered_set<UniqueIdPB> current_backends) {
+  std::unordered_map<UniqueIdPB, vector<UniqueIdPB>> to_clean_up;
+  {
+    lock_guard<mutex> lock(admission_ctrl_lock_);
+    for (const auto& entry : running_queries_) {
+      const UniqueIdPB& coord_id = entry.first;
+      auto it = current_backends.find(coord_id);
+      if (it == current_backends.end()) {
+        LOG(INFO) << "Detected that coordinator " << PrintId(coord_id)
+                  << " is no longer in the cluster membership. Cancelling "
+                  << entry.second.size() << " queries for this coordinator.";
+        to_clean_up.insert(make_pair(coord_id, vector<UniqueIdPB>()));
+        for (auto entry2 : entry.second) {
+          to_clean_up[coord_id].push_back(entry2.first);
+        }
+      }
+    }
+  }
+
+  for (const auto& entry : to_clean_up) {
+    const UniqueIdPB& coord_id = entry.first;
+    for (const UniqueIdPB& query_id : entry.second) {
+      ReleaseQuery(query_id, coord_id, -1, /* release_remaining_backends */ true);
+    }
+
+    lock_guard<mutex> lock(admission_ctrl_lock_);
+    auto it = running_queries_.find(coord_id);
+    // It's possible that more queries will have been scheduled for this coordinator
+    // since we constructed 'to_clean_up' above, eg. because they were queued. In that
+    // case, their resources will be released on the next statestore heartbeat.
+    // TODO: handle removing queued queries when their coordinator goes down.
+    if (it->second.size() == 0) {
+      running_queries_.erase(it);
+    }
+  }
+  return to_clean_up;
+}
+
 Status AdmissionController::ResolvePoolAndGetConfig(
     const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) {
   RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name));
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index fe9dbc3..fb8bca2 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -186,7 +186,9 @@ enum class AdmissionOutcome {
 /// - ReleaseQuery rpc fails: coordinators periodically send a list of registered query
 ///   ids via a heartbeat rpc, allowing the admission contoller to clean up any queries
 ///   that are not in that list.
-/// - TODO(IMPALA-10594): handle the case of coordinators failing
+/// - Coordinator fails: the admission control service uses the statestore to detect when
+///   a coordinator has been removed from the cluster membership and releases all queries
+///   that were running at that coordinator.
 /// - RelaseQueryBackends rpc fails: when ReleaseQuery is eventually called (as guaranteed
 ///   by the above), it will automatically release any remaining backends.
 ///
@@ -405,6 +407,13 @@ class AdmissionController {
   std::vector<UniqueIdPB> CleanupQueriesForHost(
       const UniqueIdPB& coord_id, const std::unordered_set<UniqueIdPB> query_ids);
 
+  /// Relases the resources for any queries currently running on coordinators that do not
+  /// appear in 'current_backends'. Called in response to statestore updates. Returns a
+  /// map from the backend id of any coordinator detected to have failed to a list of
+  /// queries that were released for that coordinator.
+  std::unordered_map<UniqueIdPB, std::vector<UniqueIdPB>>
+  CancelQueriesOnFailedCoordinators(std::unordered_set<UniqueIdPB> current_backends);
+
   /// Registers the request queue topic with the statestore, starts up the dequeue thread
   /// and registers a callback with the cluster membership manager to receive updates for
   /// membership changes.
diff --git a/be/src/scheduling/admissiond-env.cc b/be/src/scheduling/admissiond-env.cc
index 9f15f38..383eadf 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -99,6 +99,14 @@ Status AdmissiondEnv::Init() {
   RETURN_IF_ERROR(admission_control_svc_->Init());
 
   RETURN_IF_ERROR(cluster_membership_mgr_->Init());
+  cluster_membership_mgr_->RegisterUpdateCallbackFn(
+      [&](ClusterMembershipMgr::SnapshotPtr snapshot) {
+        std::unordered_set<BackendIdPB> current_backends;
+        for (const auto& it : snapshot->current_backends) {
+          current_backends.insert(it.second.backend_id());
+        }
+        admission_control_svc_->CancelQueriesOnFailedCoordinators(current_backends);
+      });
   RETURN_IF_ERROR(admission_controller_->Init());
   return Status::OK();
 }
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index c268d98..2b9ca02 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1141,9 +1141,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
     self.close_query(queued_query_handle)
 
-  def _wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
+  def _wait_for_change_to_profile(
+      self, query_handle, search_string, timeout=20, client=None):
+    if client is None:
+      client = self.client
     for _ in range(timeout * 10):
-      profile = self.client.get_runtime_profile(query_handle)
+      profile = client.get_runtime_profile(query_handle)
       if search_string in profile:
         return
       sleep(0.1)
@@ -1456,6 +1459,36 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
     self.wait_for_state(
         handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1")
+  def test_coordinator_failed(self):
+    """Tests that if a coordinator fails, the resources for queries running at that
+    coordinator are eventually released."""
+    # Query designed to run for a few minutes.
+    query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
+    impalad1 = self.cluster.impalads[0]
+    client1 = impalad1.service.create_beeswax_client()
+    handle1 = client1.execute_async(query)
+    timeout_s = 10
+    # Make sure the first query has been admitted.
+    self.wait_for_state(
+        handle1, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client1)
+
+    # Run another query with a different coordinator. This query should be queued because
+    # only 1 query is allowed in the default pool.
+    impalad2 = self.cluster.impalads[1]
+    client2 = impalad2.service.create_beeswax_client()
+    handle2 = client2.execute_async(query)
+    self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2)
+
+    # Kill the coordinator for the first query. The resources for the query should get
+    # cleaned up and the second query should be admitted.
+    impalad1.kill()
+    self.wait_for_state(
+        handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client2)
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin