You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/25 01:04:30 UTC

[impala] branch master updated (622e3c9 -> e3bafcb)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 622e3c9  IMPALA-10580: Implement ds_theta_union_f() function
     new 452c2f1  IMPALA-10604: Allow setting KuduClient's verbose log level directly
     new e3bafcb  IMPALA-10590: Introduce admission service heartbeat mechanism

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/kudu-util.cc                           |  10 +-
 be/src/scheduling/admission-control-service.cc     |  25 ++++-
 be/src/scheduling/admission-control-service.h      |   2 +
 be/src/scheduling/admission-controller.cc          | 106 ++++++++++++++++-----
 be/src/scheduling/admission-controller.h           |  64 +++++++++----
 .../scheduling/local-admission-control-client.cc   |   4 +-
 .../scheduling/remote-admission-control-client.h   |  12 ++-
 be/src/service/impala-server.cc                    |  48 ++++++++++
 be/src/service/impala-server.h                     |   7 ++
 common/protobuf/admission_control_service.proto    |  21 ++++
 tests/custom_cluster/test_admission_controller.py  |  38 +++++++-
 11 files changed, 287 insertions(+), 50 deletions(-)

[impala] 02/02: IMPALA-10590: Introduce admission service heartbeat mechanism

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e3bafcbef4fd7152ecfcbc7d331e41e9778caf15
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Tue Mar 16 14:17:32 2021 -0700

    IMPALA-10590: Introduce admission service heartbeat mechanism
    
    Currently, if a ReleaseQuery rpc fails, it's possible for the
    admission service to think that some resources are still being used
    that are actually free.
    
    This patch fixes the issue by introducing a periodic heartbeat rpc
    from coordinators to the admission service which contains a list of
    queries registered at that coordinator.
    
    If there is a query that the admission service thinks is running but
    is not included in the heartbeat, the admission service can conclude
    that the query must have already completed and release its resources.
    
    Testing:
    - Added a test that uses a debug action to simulate ReleaseQuery rpcs
      failing and checks that query resources are released properly.
    
    Change-Id: Ia528d92268cea487ada20b476935a81166f5ad34
    Reviewed-on: http://gerrit.cloudera.org:8080/17194
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-control-service.cc     |  25 ++++-
 be/src/scheduling/admission-control-service.h      |   2 +
 be/src/scheduling/admission-controller.cc          | 106 ++++++++++++++++-----
 be/src/scheduling/admission-controller.h           |  64 +++++++++----
 .../scheduling/local-admission-control-client.cc   |   4 +-
 .../scheduling/remote-admission-control-client.h   |  12 ++-
 be/src/service/impala-server.cc                    |  48 ++++++++++
 be/src/service/impala-server.h                     |   7 ++
 common/protobuf/admission_control_service.proto    |  21 ++++
 tests/custom_cluster/test_admission_controller.py  |  38 +++++++-
 10 files changed, 278 insertions(+), 49 deletions(-)

diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc
index 43d3b7f..dcdf4e8 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -217,7 +217,7 @@ void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req,
     lock_guard<mutex> l(admission_state->lock);
     if (!admission_state->released) {
       AdmissiondEnv::GetInstance()->admission_controller()->ReleaseQuery(
-          req->query_id(), req->peak_mem_consumption());
+          req->query_id(), admission_state->coord_id, req->peak_mem_consumption());
       admission_state->released = true;
     } else {
       LOG(WARNING) << "Query " << req->query_id() << " was already released.";
@@ -252,7 +252,7 @@ void AdmissionControlService::ReleaseQueryBackends(
     }
 
     AdmissiondEnv::GetInstance()->admission_controller()->ReleaseQueryBackends(
-        req->query_id(), host_addrs);
+        req->query_id(), admission_state->coord_id, host_addrs);
   }
 
   RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
@@ -267,6 +267,27 @@ void AdmissionControlService::CancelAdmission(const CancelAdmissionRequestPB* re
   RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
 }
 
+void AdmissionControlService::AdmissionHeartbeat(const AdmissionHeartbeatRequestPB* req,
+    AdmissionHeartbeatResponsePB* resp, kudu::rpc::RpcContext* rpc_context) {
+  VLOG(2) << "AdmissionHeartbeat: host_id=" << req->host_id();
+
+  std::unordered_set<UniqueIdPB> query_ids;
+  for (const UniqueIdPB& query_id : req->query_ids()) {
+    query_ids.insert(query_id);
+  }
+  vector<UniqueIdPB> cleaned_up =
+      AdmissiondEnv::GetInstance()->admission_controller()->CleanupQueriesForHost(
+          req->host_id(), query_ids);
+
+  for (const UniqueIdPB& query_id : cleaned_up) {
+    // ShardedQueryMap::Delete will log an error already if anything goes wrong, so just
+    // ignore the return value.
+    discard_result(admission_state_map_.Delete(query_id));
+  }
+
+  RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
+}
+
 void AdmissionControlService::AdmitFromThreadPool(UniqueIdPB query_id) {
   shared_ptr<AdmissionState> admission_state;
   Status s = admission_state_map_.Get(query_id, &admission_state);
diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h
index 23ebce6..93d8bd4 100644
--- a/be/src/scheduling/admission-control-service.h
+++ b/be/src/scheduling/admission-control-service.h
@@ -63,6 +63,8 @@ class AdmissionControlService : public AdmissionControlServiceIf,
       ReleaseQueryBackendsResponsePB* resp, kudu::rpc::RpcContext* context) override;
   virtual void CancelAdmission(const CancelAdmissionRequestPB* req,
       CancelAdmissionResponsePB* resp, kudu::rpc::RpcContext* context) override;
+  virtual void AdmissionHeartbeat(const AdmissionHeartbeatRequestPB* req,
+      AdmissionHeartbeatResponsePB* resp, kudu::rpc::RpcContext* context) override;
 
   /// Gets a AdmissionControlService proxy to the configured admission control service.
   /// The newly created proxy is returned in 'proxy'. Returns error status on failure.
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 7c53ecc..208d9d2 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -683,11 +683,14 @@ void AdmissionController::PoolStats::ReleaseQuery(int64_t peak_mem_consumption)
   DCHECK_GE(agg_num_running_, 0);
 
   // Update the 'peak_mem_histogram' based on the given peak memory consumption of the
-  // query.
-  int64_t histogram_bucket =
-      BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE;
-  histogram_bucket = std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1;
-  peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]);
+  // query, if provided.
+  if (peak_mem_consumption != -1) {
+    int64_t histogram_bucket =
+        BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE;
+    histogram_bucket =
+        std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1;
+    peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]);
+  }
 }
 
 void AdmissionController::PoolStats::ReleaseMem(int64_t mem_to_release) {
@@ -1210,7 +1213,7 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
         return Status::CANCELLED;
       }
       VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id);
-      AdmitQuery(queue_node->admitted_schedule.get(), false);
+      AdmitQuery(queue_node, false);
       stats->UpdateWaitTime(0);
       VLOG_RPC << "Final: " << stats->DebugString();
       *schedule_result = move(queue_node->admitted_schedule->query_schedule_pb());
@@ -1343,14 +1346,24 @@ Status AdmissionController::WaitOnQueued(const UniqueIdPB& query_id,
   }
 }
 
-void AdmissionController::ReleaseQuery(
-    const UniqueIdPB& query_id, int64_t peak_mem_consumption) {
+void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id,
+    const UniqueIdPB& coord_id, int64_t peak_mem_consumption) {
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    auto it = running_queries_.find(query_id);
-    if (it == running_queries_.end()) {
-      LOG(DFATAL) << "Unable to find resources to release for query "
-                  << PrintId(query_id);
+    auto host_it = running_queries_.find(coord_id);
+    if (host_it == running_queries_.end()) {
+      LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
+                  << " to get resources to release for query " << PrintId(query_id)
+                  << ", may have already been released.";
+      return;
+    }
+    auto it = host_it->second.find(query_id);
+    if (it == host_it->second.end()) {
+      // In the context of the admission control service, this may happen, eg. if a
+      // ReleaseQuery rpc is reported as failed to the coordinator but actually ends up
+      // arriving much later, so only log at WARNING level.
+      LOG(WARNING) << "Unable to find resources to release for query "
+                   << PrintId(query_id) << ", may have already been released.";
       return;
     }
 
@@ -1365,19 +1378,26 @@ void AdmissionController::ReleaseQuery(
     UpdateExecGroupMetric(running_query.executor_group, -1);
     VLOG_RPC << "Released query id=" << PrintId(query_id) << " " << stats->DebugString();
     pending_dequeue_ = true;
-    running_queries_.erase(it);
+    host_it->second.erase(it);
   }
   dequeue_cv_.NotifyOne();
 }
 
-void AdmissionController::ReleaseQueryBackends(
-    const UniqueIdPB& query_id, const vector<NetworkAddressPB>& host_addrs) {
+void AdmissionController::ReleaseQueryBackends(const UniqueIdPB& query_id,
+    const UniqueIdPB& coord_id, const vector<NetworkAddressPB>& host_addrs) {
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    auto it = running_queries_.find(query_id);
-    if (it == running_queries_.end()) {
-      LOG(DFATAL) << "Unable to find resources to release for query backends "
-                  << PrintId(query_id);
+    auto host_it = running_queries_.find(coord_id);
+    if (host_it == running_queries_.end()) {
+      LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
+                  << " to get resources to release backends for query "
+                  << PrintId(query_id) << ", may have already been released.";
+      return;
+    }
+    auto it = host_it->second.find(query_id);
+    if (it == host_it->second.end()) {
+      LOG(DFATAL) << "Unable to find resources to release backends for query "
+                  << PrintId(query_id) << ", may have already been released.";
       return;
     }
 
@@ -1408,6 +1428,37 @@ void AdmissionController::ReleaseQueryBackends(
   dequeue_cv_.NotifyOne();
 }
 
+vector<UniqueIdPB> AdmissionController::CleanupQueriesForHost(
+    const UniqueIdPB& coord_id, const std::unordered_set<UniqueIdPB> query_ids) {
+  vector<UniqueIdPB> to_clean_up;
+  {
+    lock_guard<mutex> lock(admission_ctrl_lock_);
+    auto host_it = running_queries_.find(coord_id);
+    if (host_it == running_queries_.end()) {
+      // This is expected if a coordinator has not submitted any queries yet, eg. at
+      // startup, so we log at a higher level to avoid log spam.
+      VLOG(3) << "Unable to find host " << PrintId(coord_id)
+              << " to cleanup queries for.";
+      return to_clean_up;
+    }
+    for (auto entry : host_it->second) {
+      const UniqueIdPB& query_id = entry.first;
+      auto it = query_ids.find(query_id);
+      if (it == query_ids.end()) {
+        to_clean_up.push_back(query_id);
+      }
+    }
+  }
+
+  for (const UniqueIdPB& query_id : to_clean_up) {
+    LOG(INFO) << "Releasing resources for query " << PrintId(query_id)
+              << " as it's coordinator " << PrintId(coord_id)
+              << " reports that it is no longer registered.";
+    ReleaseQuery(query_id, coord_id, -1);
+  }
+  return to_clean_up;
+}
+
 Status AdmissionController::ResolvePoolAndGetConfig(
     const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) {
   RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name));
@@ -1877,7 +1928,7 @@ void AdmissionController::DequeueLoop() {
         DCHECK(!is_cancelled);
         DCHECK(!is_rejected);
         DCHECK(queue_node->admitted_schedule != nullptr);
-        AdmitQuery(queue_node->admitted_schedule.get(), true);
+        AdmitQuery(queue_node, true);
       }
       pools_for_updates_.insert(pool_name);
     }
@@ -1949,7 +2000,8 @@ AdmissionController::PoolStats* AdmissionController::GetPoolStats(
   return &it->second;
 }
 
-void AdmissionController::AdmitQuery(ScheduleState* state, bool was_queued) {
+void AdmissionController::AdmitQuery(QueueNode* node, bool was_queued) {
+  ScheduleState* state = node->admitted_schedule.get();
   VLOG_RPC << "For Query " << PrintId(state->query_id())
            << " per_backend_mem_limit set to: "
            << PrintBytes(state->per_backend_mem_limit())
@@ -1987,8 +2039,16 @@ void AdmissionController::AdmitQuery(ScheduleState* state, bool was_queued) {
   num_released_backends_[state->query_id()] = state->per_backend_schedule_states().size();
 
   // Store info about the admitted resources so that we can release them.
-  DCHECK(running_queries_.find(state->query_id()) == running_queries_.end());
-  RunningQuery& running_query = running_queries_[state->query_id()];
+  auto it = running_queries_.find(node->admission_request.coord_id);
+  if (it == running_queries_.end()) {
+    auto insert_result =
+        running_queries_.insert(make_pair(node->admission_request.coord_id,
+            std::unordered_map<UniqueIdPB, RunningQuery>()));
+    DCHECK(insert_result.second);
+    it = insert_result.first;
+  }
+  DCHECK(it->second.find(state->query_id()) == it->second.end());
+  RunningQuery& running_query = it->second[state->query_id()];
   running_query.request_pool = state->request_pool();
   running_query.executor_group = state->executor_group();
   for (const auto& entry : state->per_backend_schedule_states()) {
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index b281730..ed3f3ba 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -20,6 +20,7 @@
 
 #include <list>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -168,11 +169,30 @@ enum class AdmissionOutcome {
 /// When queries complete they must be explicitly released from the admission controller
 /// using the methods 'ReleaseQuery' and 'ReleaseQueryBackends'. These methods release
 /// the admitted memory and decrement the number of admitted queries for the resource
-/// pool. All Backends for a query must be released via 'ReleaseQueryBackends' before the
-/// query is released using 'ReleaseQuery'. Releasing Backends releases the admitted
-/// memory used by that Backend and decrements the number of running queries on the host
-/// running that Backend. Releasing a query does not release any admitted memory, it only
-/// decrements the number of running queries in the resource pool.
+/// pool.
+///
+/// In the traditional distributed admission control mode, it is required that all
+/// backends for a query must be released via 'ReleaseQueryBackends' then the query is
+/// released using 'ReleaseQuery'. This is possible to guarantee since the coordinator
+/// and AdmissionController are running in the same process.
+///
+/// In the admission control service mode, more flexibility is allowed to maintain fault
+/// tolerance in the case of rpc failures between coordinators and the admissiond. In this
+/// case, proper resource accounting is ensured with two invariants: 1) the aggregate
+/// values of resources in use always matches the contents of 'running_queries_" 2) any
+/// query will always eventually be removed from 'running_queries_' and have all of its
+/// resources released, regardless of any failures.
+/// There are a few failure cases to consider:
+/// - ReleaseQuery rpc fails: coordinators periodically send a list of registered query
+///   ids via a heartbeat rpc, allowing the admission contoller to clean up any queries
+///   that are not in that list.
+/// - TODO(IMPALA-10594): handle the case of coordinators failing
+/// - TODO(IMPALA-10591): handle the case of a ReleaseQueryBackends rpc failing
+///
+/// Releasing Backends releases the admitted memory used by that Backend and decrements
+/// the number of running queries on the host running that Backend. Releasing a query does
+/// not release any admitted memory, it only decrements the number of running queries in
+/// the resource pool.
 ///
 /// Executor Groups:
 /// Executors in a cluster can be assigned to executor groups. Each executor can only be
@@ -358,19 +378,28 @@ class AdmissionController {
 
   /// Updates the pool statistics when a query completes (either successfully,
   /// is cancelled or failed). This should be called for all requests that have
-  /// been submitted via AdmitQuery(). 'query_id' is the completed query and
-  /// 'peak_mem_consumption' is the peak memory consumption of the query.
+  /// been submitted via AdmitQuery(). 'query_id' is the completed query, 'coord_id' is
+  /// the backend id of the coordinator for the query, and 'peak_mem_consumption' is the
+  /// peak memory consumption of the query, which may be -1 if unavailable.
   /// This does not block.
-  void ReleaseQuery(const UniqueIdPB& query_id, int64_t peak_mem_consumption);
+  void ReleaseQuery(const UniqueIdPB& query_id, const UniqueIdPB& coord_id,
+      int64_t peak_mem_consumption);
 
   /// Updates the pool statistics when a Backend running a query completes (either
   /// successfully, is cancelled or failed). This should be called for all Backends part
   /// of a query for all queries that have been submitted via AdmitQuery().
-  /// 'query_id' is the associated query and the vector of NetworkAddressPBs identify the
-  /// completed Backends.
+  /// 'query_id' is the associated query, 'coord_id' is the backend id of the coordinator
+  /// for the query, and the vector of NetworkAddressPBs identify the completed Backends.
   /// This does not block.
-  void ReleaseQueryBackends(
-      const UniqueIdPB& query_id, const vector<NetworkAddressPB>& host_addr);
+  void ReleaseQueryBackends(const UniqueIdPB& query_id, const UniqueIdPB& coord_id,
+      const vector<NetworkAddressPB>& host_addr);
+
+  /// Releases the resources for any queries that were scheduled for the coordinator
+  /// 'coord_id' that are not in the list 'query_ids'. Only used in the context of the
+  /// admission control service. Returns a list of the queries that had their resources
+  /// released.
+  std::vector<UniqueIdPB> CleanupQueriesForHost(
+      const UniqueIdPB& coord_id, const std::unordered_set<UniqueIdPB> query_ids);
 
   /// Registers the request queue topic with the statestore, starts up the dequeue thread
   /// and registers a callback with the cluster membership manager to receive updates for
@@ -822,11 +851,12 @@ class AdmissionController {
     std::unordered_map<NetworkAddressPB, BackendAllocation> per_backend_resources;
   };
 
-  /// Map from query id of currently running queries to information about the resources
-  /// that were allocated to them. Used to properly account for resources when releasing
-  /// queries.
+  /// Map from host id to a map from query id of currently running queries to information
+  /// about the resources that were allocated to them. Used to properly account for
+  /// resources when releasing queries.
   /// Protected by admission_ctrl_lock_.
-  std::unordered_map<UniqueIdPB, RunningQuery> running_queries_;
+  std::unordered_map<UniqueIdPB, std::unordered_map<UniqueIdPB, RunningQuery>>
+      running_queries_;
 
   /// Map of pool names to the pool configs returned by request_pool_service_. Stored so
   /// that the dequeue thread does not need to access the configs via the request pool
@@ -1020,7 +1050,7 @@ class AdmissionController {
   /// Sets the per host mem limit and mem admitted in the schedule and does the necessary
   /// accounting and logging on successful submission.
   /// Caller must hold 'admission_ctrl_lock_'.
-  void AdmitQuery(ScheduleState* state, bool was_queued);
+  void AdmitQuery(QueueNode* node, bool was_queued);
 
   /// Same as PoolToJson() but requires 'admission_ctrl_lock_' to be held by the caller.
   /// Is a helper method used by both PoolToJson() and AllPoolsToJson()
diff --git a/be/src/scheduling/local-admission-control-client.cc b/be/src/scheduling/local-admission-control-client.cc
index da62ee1..faf47a5 100644
--- a/be/src/scheduling/local-admission-control-client.cc
+++ b/be/src/scheduling/local-admission-control-client.cc
@@ -50,13 +50,13 @@ Status LocalAdmissionControlClient::SubmitForAdmission(
 
 void LocalAdmissionControlClient::ReleaseQuery(int64_t peak_mem_consumption) {
   ExecEnv::GetInstance()->admission_controller()->ReleaseQuery(
-      query_id_, peak_mem_consumption);
+      query_id_, ExecEnv::GetInstance()->backend_id(), peak_mem_consumption);
 }
 
 void LocalAdmissionControlClient::ReleaseQueryBackends(
     const vector<NetworkAddressPB>& host_addrs) {
   ExecEnv::GetInstance()->admission_controller()->ReleaseQueryBackends(
-      query_id_, host_addrs);
+      query_id_, ExecEnv::GetInstance()->backend_id(), host_addrs);
 }
 
 void LocalAdmissionControlClient::CancelAdmission() {
diff --git a/be/src/scheduling/remote-admission-control-client.h b/be/src/scheduling/remote-admission-control-client.h
index 92ddd65..1cc3b5f 100644
--- a/be/src/scheduling/remote-admission-control-client.h
+++ b/be/src/scheduling/remote-admission-control-client.h
@@ -36,7 +36,17 @@ namespace impala {
 class AdmissionControlServiceProxy;
 
 /// Implementation of AdmissionControlClient used to submit queries for admission to an
-/// AdmissionController running locally on the coordinator.
+/// AdmissionController running remotely in an admissiond.
+///
+/// Handles retrying of rpcs for fault tolerance:
+/// - For the AdmitQuery() rpc, retries with jitter and backoff for a configurable amount
+///   of time, then fails the query if unsuccessful. The default retry time was chosen as
+///   a larger value (60 seconds) to minimize the number of failed queries when the
+///   admissiond is restarted or temporarily unavailable.
+/// - For the ReleaseQuery(), ReleaseQueryBackends(), and CancelAdmission() rpcs, retries
+///   just 3 times before giving up. Failures of these rpcs are not considered to fail the
+///   overall query, and there are other mechanisms in place to ensure resources are
+///   eventually released regardless of failures of these rpcs, eg. AdmissionHeartbeat.
 class RemoteAdmissionControlClient : public AdmissionControlClient {
  public:
   RemoteAdmissionControlClient(const TQueryCtx& query_ctx);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index b460d25..b5626e2 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -57,7 +57,9 @@
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
+#include "gen-cpp/admission_control_service.proxy.h"
 #include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
 #include "rpc/rpc-mgr.h"
@@ -72,6 +74,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-control-service.h"
 #include "scheduling/admission-controller.h"
 #include "service/cancellation-work.h"
 #include "service/client-request-state.h"
@@ -335,6 +338,11 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
     "be converted from UTC to local time. Writes are unaffected. "
     "Can be overriden with the query option with the same name.");
 
+DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
+    "(Advanced) The time in milliseconds to wait between sending heartbeats to the "
+    "admission service, if enabled. Heartbeats are used to ensure resources are properly "
+    "accounted for even if rpcs to the admission service occasionally fail.");
+
 namespace {
 using namespace impala;
 
@@ -536,6 +544,11 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
         bind<void>(&ImpalaServer::UnresponsiveBackendThread, this),
         &unresponsive_backend_thread_));
   }
+  if (exec_env_->AdmissionServiceEnabled()) {
+    ABORT_IF_ERROR(Thread::Create("impala-server", "admission-heartbeat-thread",
+        bind<void>(&ImpalaServer::AdmissionHeartbeatThread, this),
+        &admission_heartbeat_thread_));
+  }
 
   is_coordinator_ = FLAGS_is_coordinator;
   is_executor_ = FLAGS_is_executor;
@@ -2723,6 +2736,41 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
   }
 }
 
+[[noreturn]] void ImpalaServer::AdmissionHeartbeatThread() {
+  while (true) {
+    SleepForMs(FLAGS_admission_heartbeat_frequency_ms);
+    std::unique_ptr<AdmissionControlServiceProxy> proxy;
+    Status get_proxy_status = AdmissionControlService::GetProxy(&proxy);
+    if (!get_proxy_status.ok()) {
+      LOG(ERROR) << "Admission heartbeat thread was unabe to get an "
+                    "AdmissionControlService proxy:"
+                 << get_proxy_status;
+      continue;
+    }
+
+    AdmissionHeartbeatRequestPB request;
+    AdmissionHeartbeatResponsePB response;
+    *request.mutable_host_id() = exec_env_->backend_id();
+    query_driver_map_.DoFuncForAllEntries(
+        [&](const std::shared_ptr<QueryDriver>& query_driver) {
+          ClientRequestState* request_state = query_driver->GetActiveClientRequestState();
+          TUniqueIdToUniqueIdPB(request_state->query_id(), request.add_query_ids());
+        });
+
+    kudu::rpc::RpcController rpc_controller;
+    kudu::Status rpc_status =
+        proxy->AdmissionHeartbeat(request, &response, &rpc_controller);
+    if (!rpc_status.ok()) {
+      LOG(ERROR) << "Admission heartbeat rpc failed: " << rpc_status.ToString();
+      continue;
+    }
+    Status heartbeat_status(response.status());
+    if (!heartbeat_status.ok()) {
+      LOG(ERROR) << "Admission heartbeat failed: " << heartbeat_status;
+    }
+  }
+}
+
 Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
   Coordinator* coord = crs->GetCoordinator();
   // Coordinator may be null if query has not started executing, check again later.
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a98c2b4..4b74d4b 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -1153,6 +1153,10 @@ class ImpalaServer : public ImpalaServiceIf,
   /// status report in greater than GetMaxReportRetryMs().
   [[noreturn]] void UnresponsiveBackendThread();
 
+  /// If the admission control service is enabled, periodically sends a list of all
+  /// current query ids to the admissiond.
+  [[noreturn]] void AdmissionHeartbeatThread();
+
   /// Called from ExpireQueries() to check query resource limits for 'crs'. If the query
   /// exceeded a resource limit, returns a non-OK status with information about what
   /// limit was exceeded. Returns OK if the query will continue running and expiration
@@ -1254,6 +1258,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Thread that runs UnresponsiveBackendThread().
   std::unique_ptr<Thread> unresponsive_backend_thread_;
 
+  /// Thread that runs AdmissionHeartbeatThread().
+  std::unique_ptr<Thread> admission_heartbeat_thread_;
+
   /// The QueryDriverMap maps query ids to QueryDrivers. The QueryDrivers are owned by the
   /// ImpalaServer and QueryDriverMap references them using shared_ptr to allow
   /// asynchronous deletion.
diff --git a/common/protobuf/admission_control_service.proto b/common/protobuf/admission_control_service.proto
index 140a47e..798f74c 100644
--- a/common/protobuf/admission_control_service.proto
+++ b/common/protobuf/admission_control_service.proto
@@ -220,6 +220,18 @@ message CancelAdmissionResponsePB {
   optional StatusPB status = 1;
 }
 
+message AdmissionHeartbeatRequestPB {
+  // The backend id for the coordinator sending this heartbeat.
+  optional UniqueIdPB host_id = 1;
+
+  // A list of all queries registered at this coordinator.
+  repeated UniqueIdPB query_ids = 2;
+}
+
+message AdmissionHeartbeatResponsePB {
+  optional StatusPB status = 1;
+}
+
 service AdmissionControlService {
   /// Called by the coordinator to start scheduling. The actual work is done on a thread
   /// pool, so this call returns immedately. Idempotent - if the query has already been
@@ -250,4 +262,13 @@ service AdmissionControlService {
   /// Called by the coordinator to cancel scheduling of a query for which GetQueryStatus
   /// has not yet returned a schedule.
   rpc CancelAdmission(CancelAdmissionRequestPB) returns (CancelAdmissionResponsePB);
+
+  /// Used to ensure that the admission service and coordinator have a consistent view of
+  /// what resources are being used even in the face of possible rpc failures.
+  /// Periodically called by each coordinator with a list of query ids for all queries at
+  /// that coordinator. If the admissiond has resources allocated to a query that is not
+  /// included in the list, it assumes the query has completed and releases it's remaining
+  /// resources.
+  rpc AdmissionHeartbeat(AdmissionHeartbeatRequestPB)
+      returns (AdmissionHeartbeatResponsePB);
 }
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index f012ae2..c445e0f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1103,10 +1103,10 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # Setup to queue a query.
     sleep_query_handle = self.client.execute_async("select sleep(10000)")
     self.client.wait_for_admission_control(sleep_query_handle)
-    self.__wait_for_change_to_profile(sleep_query_handle,
+    self._wait_for_change_to_profile(sleep_query_handle,
                                       "Admission result: Admitted immediately")
     queued_query_handle = self.client.execute_async("select 2")
-    self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+    self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
 
     # Change config to be invalid.
     llama_site_path = os.path.join(RESOURCES_DIR, "copy-mem-limit-test-llama-site.xml")
@@ -1130,7 +1130,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.client.wait_for_admission_control(sleep_query_handle)
     queued_query_handle = self.client.execute_async(
       "select * from functional_parquet.alltypes limit 1")
-    self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
+    self._wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
     # Change config to something less than the what is required to accommodate the
     # largest min_reservation (which in this case is 32.09 MB.
     config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
@@ -1141,7 +1141,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
     self.close_query(queued_query_handle)
 
-  def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
+  def _wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
     for _ in range(timeout * 10):
       profile = self.client.get_runtime_profile(query_handle)
       if search_string in profile:
@@ -1395,6 +1395,36 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
     except ImpalaBeeswaxException as e:
       assert "Failed to admit query after waiting " in str(e)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--vmodule admission-controller=3 --default_pool_max_requests=1 "
+      "--debug_actions=IMPALA_SERVICE_POOL:127.0.0.1:29500:ReleaseQuery:FAIL@1.0")
+  def test_release_query_failed(self):
+    """Tests that if the ReleaseQuery rpc fails, the query's resources will eventually be
+    cleaned up. Uses the --debug_action flag to simulate rpc failures, and sets max
+    requests for the default pool as the number of requests per pool is decremented when
+    the entire query is released."""
+    # Query designed to run for a few minutes.
+    query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
+    handle1 = self.execute_query_async(query)
+    timeout_s = 10
+    # Make sure the first query has been admitted.
+    self.wait_for_state(
+        handle1, self.client.QUERY_STATES['RUNNING'], timeout_s)
+
+    # Run another query. This query should be queued because only 1 query is allowed in
+    # the default pool.
+    handle2 = self.execute_query_async(query)
+    self._wait_for_change_to_profile(handle2, "Admission result: Queued")
+
+    # Cancel the first query. It's resources should be released and the second query
+    # should be admitted.
+    self.client.cancel(handle1)
+    self.client.close_query(handle1)
+    self.wait_for_state(
+        handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin

[impala] 01/02: IMPALA-10604: Allow setting KuduClient's verbose log level directly

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 452c2f1f7f9cc4c8472ab38949e9990281dcc3a3
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Tue Mar 23 14:42:28 2021 -0700

    IMPALA-10604: Allow setting KuduClient's verbose log level directly
    
    This patch adds a flag --kudu_client_v which allows setting the
    verbose logging level for the KuduClient to a value other than the
    level for the rest of Impala (set by -v) in order to enable debugging
    of issues in the KuduClient without producing the enormous amount of
    logging that comes with setting a high -v value on all of Impala.
    
    Testing:
    - Manually set --kudu_client_v and confirmed that the expected logging
      is produced.
    
    Change-Id: Ib39358709ee714b8cdffd72a0ee58f66d5fab37e
    Reviewed-on: http://gerrit.cloudera.org:8080/17222
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/kudu-util.cc | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index cb40f69..a623273 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -48,6 +48,10 @@ using DataType = kudu::client::KuduColumnSchema::DataType;
 DEFINE_int32(kudu_client_num_reactor_threads, 4,
     "Number of threads the Kudu client can use to send rpcs to Kudu. Must be > 0.");
 
+DEFINE_int32(kudu_client_v, -1,
+    "If >= 0, used to set the verbose logging level on the Kudu client instead of using "
+    "the value of -v");
+
 DECLARE_bool(disable_kudu);
 DECLARE_int32(kudu_client_rpc_timeout_ms);
 DECLARE_int32(kudu_client_connection_negotiation_timeout_ms);
@@ -126,7 +130,11 @@ void InitKuduLogging() {
   static kudu::client::KuduLoggingFunctionCallback<void*> log_cb(&LogKuduMessage, NULL);
   kudu::client::InstallLoggingCallback(&log_cb);
   // Kudu client logging is more noisy than Impala logging, log at v-1.
-  kudu::client::SetVerboseLogLevel(std::max(0, FLAGS_v - 1));
+  if (FLAGS_kudu_client_v >= 0) {
+    kudu::client::SetVerboseLogLevel(FLAGS_kudu_client_v);
+  } else {
+    kudu::client::SetVerboseLogLevel(std::max(0, FLAGS_v - 1));
+  }
 }
 
 ColumnType KuduDataTypeToColumnType(