You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/04/02 07:54:57 UTC
[impala] 01/05: IMPALA-10594: Handle failed coordinators in
admissiond
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 9adb093ae056f1bb7fcd14ba138fd23517648226
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Mar 18 10:42:41 2021 -0700
IMPALA-10594: Handle failed coordinators in admissiond
This patch adds a statestore callback for the admissiond that monitors
for coordinators that have been removed from the cluster membership
and releases all of the resources for queries running on those
coordinators.
Testing:
- Added a custom cluster test that kills a coordinator and verifies
that resources for queries running on it are eventually released.
Change-Id: I883f323bb765680ef24b3c3f51fb209dea15f0b0
Reviewed-on: http://gerrit.cloudera.org:8080/17209
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/scheduling/admission-control-service.cc | 16 +++++++
be/src/scheduling/admission-control-service.h | 4 ++
be/src/scheduling/admission-controller.cc | 58 ++++++++++++++++++++---
be/src/scheduling/admission-controller.h | 11 ++++-
be/src/scheduling/admissiond-env.cc | 8 ++++
tests/custom_cluster/test_admission_controller.py | 37 ++++++++++++++-
6 files changed, 125 insertions(+), 9 deletions(-)
diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc
index 1f49f2f..f00f335 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -289,6 +289,22 @@ void AdmissionControlService::AdmissionHeartbeat(const AdmissionHeartbeatRequest
RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
}
+void AdmissionControlService::CancelQueriesOnFailedCoordinators(
+ std::unordered_set<UniqueIdPB> current_backends) {
+ std::unordered_map<UniqueIdPB, vector<UniqueIdPB>> cleaned_up =
+ AdmissiondEnv::GetInstance()
+ ->admission_controller()
+ ->CancelQueriesOnFailedCoordinators(current_backends);
+
+ for (const auto& entry : cleaned_up) {
+ for (const UniqueIdPB& query_id : entry.second) {
+ // ShardedQueryMap::Delete will log an error already if anything goes wrong, so just
+ // ignore the return value.
+ discard_result(admission_state_map_.Delete(query_id));
+ }
+ }
+}
+
void AdmissionControlService::AdmitFromThreadPool(UniqueIdPB query_id) {
shared_ptr<AdmissionState> admission_state;
Status s = admission_state_map_.Get(query_id, &admission_state);
diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h
index 93d8bd4..b030769 100644
--- a/be/src/scheduling/admission-control-service.h
+++ b/be/src/scheduling/admission-control-service.h
@@ -70,6 +70,10 @@ class AdmissionControlService : public AdmissionControlServiceIf,
/// The newly created proxy is returned in 'proxy'. Returns error status on failure.
static Status GetProxy(std::unique_ptr<AdmissionControlServiceProxy>* proxy);
+ /// Relases the resources for any queries currently running on coordinators that do not
+ /// appear in 'current_backends'. Called in response to statestore updates.
+ void CancelQueriesOnFailedCoordinators(std::unordered_set<UniqueIdPB> current_backends);
+
private:
friend class ImpalaHttpHandler;
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 6bb380d..1d1f544 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1356,9 +1356,12 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id,
lock_guard<mutex> lock(admission_ctrl_lock_);
auto host_it = running_queries_.find(coord_id);
if (host_it == running_queries_.end()) {
- LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
- << " to get resources to release for query " << PrintId(query_id)
- << ", may have already been released.";
+ // In the context of the admission control service, this may happen, eg. if a
+ // coordinator is reported as failed by the statestore but a ReleaseQuery rpc from
+ // it is delayed in the network and arrives much later.
+ LOG(WARNING) << "Unable to find host " << PrintId(coord_id)
+ << " to get resources to release for query " << PrintId(query_id)
+ << ", may have already been released.";
return;
}
auto it = host_it->second.find(query_id);
@@ -1412,9 +1415,12 @@ void AdmissionController::ReleaseQueryBackendsLocked(const UniqueIdPB& query_id,
const UniqueIdPB& coord_id, const vector<NetworkAddressPB>& host_addrs) {
auto host_it = running_queries_.find(coord_id);
if (host_it == running_queries_.end()) {
- LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
- << " to get resources to release backends for query "
- << PrintId(query_id) << ", may have already been released.";
+ // In the context of the admission control service, this may happen, eg. if a
+ // coordinator is reported as failed by the statestore but a ReleaseQuery rpc from
+ // it is delayed in the network and arrives much later.
+ LOG(WARNING) << "Unable to find host " << PrintId(coord_id)
+ << " to get resources to release backends for query "
+ << PrintId(query_id) << ", may have already been released.";
return;
}
auto it = host_it->second.find(query_id);
@@ -1485,6 +1491,46 @@ vector<UniqueIdPB> AdmissionController::CleanupQueriesForHost(
return to_clean_up;
}
+std::unordered_map<UniqueIdPB, vector<UniqueIdPB>>
+AdmissionController::CancelQueriesOnFailedCoordinators(
+ std::unordered_set<UniqueIdPB> current_backends) {
+ std::unordered_map<UniqueIdPB, vector<UniqueIdPB>> to_clean_up;
+ {
+ lock_guard<mutex> lock(admission_ctrl_lock_);
+ for (const auto& entry : running_queries_) {
+ const UniqueIdPB& coord_id = entry.first;
+ auto it = current_backends.find(coord_id);
+ if (it == current_backends.end()) {
+ LOG(INFO) << "Detected that coordinator " << PrintId(coord_id)
+ << " is no longer in the cluster membership. Cancelling "
+ << entry.second.size() << " queries for this coordinator.";
+ to_clean_up.insert(make_pair(coord_id, vector<UniqueIdPB>()));
+ for (auto entry2 : entry.second) {
+ to_clean_up[coord_id].push_back(entry2.first);
+ }
+ }
+ }
+ }
+
+ for (const auto& entry : to_clean_up) {
+ const UniqueIdPB& coord_id = entry.first;
+ for (const UniqueIdPB& query_id : entry.second) {
+ ReleaseQuery(query_id, coord_id, -1, /* release_remaining_backends */ true);
+ }
+
+ lock_guard<mutex> lock(admission_ctrl_lock_);
+ auto it = running_queries_.find(coord_id);
+ // It's possible that more queries will have been scheduled for this coordinator
+ // since we constructed 'to_clean_up' above, eg. because they were queued. In that
+ // case, their resources will be released on the next statestore heartbeat.
+ // TODO: handle removing queued queries when their coordinator goes down.
+ if (it->second.size() == 0) {
+ running_queries_.erase(it);
+ }
+ }
+ return to_clean_up;
+}
+
Status AdmissionController::ResolvePoolAndGetConfig(
const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) {
RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name));
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index fe9dbc3..fb8bca2 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -186,7 +186,9 @@ enum class AdmissionOutcome {
/// - ReleaseQuery rpc fails: coordinators periodically send a list of registered query
/// ids via a heartbeat rpc, allowing the admission contoller to clean up any queries
/// that are not in that list.
-/// - TODO(IMPALA-10594): handle the case of coordinators failing
+/// - Coordinator fails: the admission control service uses the statestore to detect when
+/// a coordinator has been removed from the cluster membership and releases all queries
+/// that were running at that coordinator.
/// - RelaseQueryBackends rpc fails: when ReleaseQuery is eventually called (as guaranteed
/// by the above), it will automatically release any remaining backends.
///
@@ -405,6 +407,13 @@ class AdmissionController {
std::vector<UniqueIdPB> CleanupQueriesForHost(
const UniqueIdPB& coord_id, const std::unordered_set<UniqueIdPB> query_ids);
+ /// Relases the resources for any queries currently running on coordinators that do not
+ /// appear in 'current_backends'. Called in response to statestore updates. Returns a
+ /// map from the backend id of any coordinator detected to have failed to a list of
+ /// queries that were released for that coordinator.
+ std::unordered_map<UniqueIdPB, std::vector<UniqueIdPB>>
+ CancelQueriesOnFailedCoordinators(std::unordered_set<UniqueIdPB> current_backends);
+
/// Registers the request queue topic with the statestore, starts up the dequeue thread
/// and registers a callback with the cluster membership manager to receive updates for
/// membership changes.
diff --git a/be/src/scheduling/admissiond-env.cc b/be/src/scheduling/admissiond-env.cc
index 9f15f38..383eadf 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -99,6 +99,14 @@ Status AdmissiondEnv::Init() {
RETURN_IF_ERROR(admission_control_svc_->Init());
RETURN_IF_ERROR(cluster_membership_mgr_->Init());
+ cluster_membership_mgr_->RegisterUpdateCallbackFn(
+ [&](ClusterMembershipMgr::SnapshotPtr snapshot) {
+ std::unordered_set<BackendIdPB> current_backends;
+ for (const auto& it : snapshot->current_backends) {
+ current_backends.insert(it.second.backend_id());
+ }
+ admission_control_svc_->CancelQueriesOnFailedCoordinators(current_backends);
+ });
RETURN_IF_ERROR(admission_controller_->Init());
return Status::OK();
}
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index c268d98..2b9ca02 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1141,9 +1141,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
self.close_query(queued_query_handle)
- def _wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
+ def _wait_for_change_to_profile(
+ self, query_handle, search_string, timeout=20, client=None):
+ if client is None:
+ client = self.client
for _ in range(timeout * 10):
- profile = self.client.get_runtime_profile(query_handle)
+ profile = client.get_runtime_profile(query_handle)
if search_string in profile:
return
sleep(0.1)
@@ -1456,6 +1459,36 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
self.wait_for_state(
handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
+ @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1")
+ def test_coordinator_failed(self):
+ """Tests that if a coordinator fails, the resources for queries running at that
+ coordinator are eventually released."""
+ # Query designed to run for a few minutes.
+ query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
+ impalad1 = self.cluster.impalads[0]
+ client1 = impalad1.service.create_beeswax_client()
+ handle1 = client1.execute_async(query)
+ timeout_s = 10
+ # Make sure the first query has been admitted.
+ self.wait_for_state(
+ handle1, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client1)
+
+ # Run another query with a different coordinator. This query should be queued because
+ # only 1 query is allowed in the default pool.
+ impalad2 = self.cluster.impalads[1]
+ client2 = impalad2.service.create_beeswax_client()
+ handle2 = client2.execute_async(query)
+ self._wait_for_change_to_profile(handle2, "Admission result: Queued", client=client2)
+
+ # Kill the coordinator for the first query. The resources for the query should get
+ # cleaned up and the second query should be admitted.
+ impalad1.kill()
+ self.wait_for_state(
+ handle2, self.client.QUERY_STATES['RUNNING'], timeout_s, client=client2)
+
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between submissions
(parameterized) and the ability to submit to one impalad or many in a round-robin