You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/03/26 22:32:00 UTC

[impala] branch master updated: IMPALA-8731: Balance queries across multiple executor groups

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 35fe1f37f IMPALA-8731: Balance queries across multiple executor groups
35fe1f37f is described below

commit 35fe1f37f5656e615466504129c7550089f1773d
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Fri Mar 17 21:24:55 2023 +0800

    IMPALA-8731: Balance queries across multiple executor groups
    
    This patch adds a new admission control policy that attempts to
    balance queries across multiple executor groups belonging to the
    same request pool based on available memory and slots in each
    executor group. This feature can be enabled by setting the startup
    flag '-balance_queries_across_executor_groups=true'. The setting is
    off by default.
    
    Testing:
      - Add e2e tests to verify the default policy and the new policy.
    
    Change-Id: I25e851fb57c1d820c25cef5316f4ed800e4c6ac5
    Reviewed-on: http://gerrit.cloudera.org:8080/19630
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc    | 58 +++++++++++++++++++++--
 be/src/scheduling/admission-controller.h     |  8 +++-
 tests/custom_cluster/test_executor_groups.py | 69 ++++++++++++++++++++++++++++
 3 files changed, 129 insertions(+), 6 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 5aa672062..7fd062dff 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -43,8 +43,15 @@
 
 #include "common/names.h"
 
+using std::make_pair;
+using std::pair;
 using namespace strings;
 
+DEFINE_bool(balance_queries_across_executor_groups, false,
+    "If true, balance queries across multiple groups that belonging to the same request "
+    "pool based on available memory and slots in each executor group. If false, "
+    "admission is attempted to groups in alphanumerically sorted order.");
+
 DEFINE_int64(queue_wait_timeout_ms, 60 * 1000, "Maximum amount of time (in "
     "milliseconds) that a request will wait to be admitted before timing out.");
 
@@ -1844,10 +1851,10 @@ Status AdmissionController::ComputeGroupScheduleStates(
     return Status::OK();
   }
 
-  // We loop over the executor groups in a deterministic order. This means we will fill up
-  // each executor group before considering an unused one. In particular, we will not try
-  // to balance queries across executor groups equally.
-  // TODO(IMPALA-8731): balance queries across executor groups more evenly
+  // We loop over the executor groups in a deterministic order. If
+  // --balance_queries_across_executor_groups set to true, executor groups with more
+  // available memory and slots will be processed first. If the flag set to false, we will
+  // process executor groups in alphanumerically sorted order.
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy()
         || cluster_membership_mgr_->GetEmptyExecutorGroup() == executor_group)
@@ -2534,6 +2541,39 @@ string AdmissionController::MakePoolTopicKey(
       "$0$1$2$3", TOPIC_KEY_POOL_PREFIX, pool_name, TOPIC_KEY_DELIMITER, backend_id);
 }
 
+const pair<int64_t, int64_t> AdmissionController::GetAvailableMemAndSlots(
+    const ExecutorGroup& group) const {
+  int64_t total_mem_limit = 0;
+  int64_t total_slots = 0;
+  int64_t agg_effective_mem_reserved = 0;
+  int64_t agg_slots_in_use = 0;
+  for (const BackendDescriptorPB& be_desc : group.GetAllExecutorDescriptors()) {
+    total_mem_limit += be_desc.admit_mem_limit();
+    total_slots += be_desc.admission_slots();
+    int64_t host_mem_reserved = 0;
+    int64_t host_mem_admit = 0;
+    const string& host = NetworkAddressPBToString(be_desc.address());
+    auto stats = host_stats_.find(host);
+    if (stats != host_stats_.end()) {
+      host_mem_reserved = stats->second.mem_reserved;
+      host_mem_admit += stats->second.mem_admitted;
+      agg_slots_in_use += stats->second.slots_in_use;
+    }
+    for (const auto& remote_entry : remote_per_host_stats_) {
+      auto remote_stats = remote_entry.second.find(host);
+      if (remote_stats != remote_entry.second.end()) {
+        host_mem_admit += remote_stats->second.mem_admitted;
+        agg_slots_in_use += remote_stats->second.slots_in_use;
+      }
+    }
+    agg_effective_mem_reserved += std::max(host_mem_reserved, host_mem_admit);
+  }
+  DCHECK_GE(total_mem_limit, agg_effective_mem_reserved);
+  DCHECK_GE(total_slots, agg_slots_in_use);
+  return make_pair(
+      total_mem_limit - agg_effective_mem_reserved, total_slots - agg_slots_in_use);
+}
+
 vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery(
     const ClusterMembershipMgr::ExecutorGroups& all_groups,
     const AdmissionRequest& request) {
@@ -2567,6 +2607,16 @@ vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery(
     return a->name() < b->name();
   };
   sort(matching_groups.begin(), matching_groups.end(), cmp);
+  if (FLAGS_balance_queries_across_executor_groups) {
+    // Sort executor groups by available memory and slots in descending order, we
+    // prioritize executor groups that with more available memory, when their available
+    // memory are same we choose the one with more available slots, when their available
+    // memory and slots are same we choose on an alphabetical basis.
+    auto available_resource_cmp = [this](const ExecutorGroup* a, const ExecutorGroup* b) {
+      return GetAvailableMemAndSlots(*a) > GetAvailableMemAndSlots(*b);
+    };
+    sort(matching_groups.begin(), matching_groups.end(), available_resource_cmp);
+  }
   return matching_groups;
 }
 
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 20a40e11f..01b470fd7 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -277,8 +277,8 @@ enum class AdmissionOutcome {
 /// FindGroupToAdmitOrReject(). From there we call ComputeGroupScheduleStates() which
 /// calls compute schedules for both executor groups. Then we perform rejection tests and
 /// afterwards call CanAdmitRequest() for each of the schedules. Executor groups are
-/// processed in alphanumerically sorted order, so we attempt admission to group
-/// "default-pool-group-1" first. CanAdmitRequest() calls HasAvailableSlots() to check
+/// processed in a deterministic order, see comments in ComputeGroupScheduleStates() for
+/// details. CanAdmitRequest() calls HasAvailableSlots() to check
 /// whether any of the hosts in the group can fit the new query in their available slots
 /// and since it does fit, admission succeeds. The query is admitted and 'slots_in_use'
 /// is incremented for each host in that group based on the effective parallelism of the
@@ -1130,6 +1130,10 @@ class AdmissionController {
   /// Returns the maximum number of requests that can be queued in the pool.
   static int64_t GetMaxQueuedForPool(const TPoolConfig& pool_config);
 
+  /// Returns available memory and slots of the executor group.
+  const std::pair<int64_t, int64_t> GetAvailableMemAndSlots(
+      const ExecutorGroup& group) const;
+
   /// Return all executor groups from 'all_groups' that can be used to run the query. If
   /// the query is a coordinator only query then a reserved empty group is returned
   /// otherwise returns all healthy groups that can be used to run queries for the
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 6c60b703c..579ec39f1 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -972,3 +972,72 @@ class TestExecutorGroups(CustomClusterTestSuite):
     assert self._get_num_executor_groups(exec_group_set_prefix="root.queue2") == 1
     assert self.coordinator.service.get_metric_value(
       "cluster-membership.group-set.backends.total.root.queue2") == 1
+
+  def _setup_two_coordinator_two_exec_group_cluster(self, coordinator_test_args):
+    """Start a cluster with two coordinators and two executor groups that mapped to
+    the same request pool 'root.queue1'."""
+    RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
+                                 "resources")
+    fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-allocation.xml")
+    llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml")
+    # Start with a regular admission config with multiple pools and no resource limits.
+    self._restart_coordinators(num_coordinators=2,
+         extra_args="-fair_scheduler_allocation_path %s "
+                    "-llama_site_path %s %s" %
+                    (fs_allocation_path, llama_site_path, coordinator_test_args))
+    # Add two executor groups with 2 admission slots and 1 executor.
+    self._add_executor_group("group1", min_size=1, admission_control_slots=2,
+                             resource_pool="root.queue1")
+    self._add_executor_group("group2", min_size=1, admission_control_slots=2,
+                             resource_pool="root.queue1")
+    assert self._get_num_executor_groups(only_healthy=True) == 2
+
+  def _execute_query_async_using_client_and_verify_exec_group(self, client, query,
+    config_options, expected_group_str):
+    """Execute 'query' asynchronously using 'client' with given 'config_options'.
+    Assert existence of expected_group_str in query profile."""
+    client.set_configuration(config_options)
+    query_handle = client.execute_async(query)
+    self.wait_for_state(query_handle, client.QUERY_STATES['RUNNING'], 30, client=client)
+    assert expected_group_str in client.get_runtime_profile(query_handle)
+
+  @pytest.mark.execute_serially
+  def test_default_assign_policy_with_multiple_exec_groups_and_coordinators(self):
+    """Tests that the default admission control assign policy is filling up executor
+    groups one by one."""
+    # A long running query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypes \
+             where month < 3 and id + random() < sleep(100);"
+    coordinator_test_args = ""
+    self._setup_two_coordinator_two_exec_group_cluster(coordinator_test_args)
+    # Create fresh clients
+    self.create_impala_clients()
+    second_coord_client = self.create_client_for_nth_impalad(1)
+    # Check that the first two queries both run in 'group1'.
+    self._execute_query_async_using_client_and_verify_exec_group(self.client,
+        QUERY, {'request_pool': 'queue1'}, "Executor Group: root.queue1-group1")
+    self._execute_query_async_using_client_and_verify_exec_group(second_coord_client,
+        QUERY, {'request_pool': 'queue1'}, "Executor Group: root.queue1-group1")
+    self.client.close()
+    second_coord_client.close()
+
+  @pytest.mark.execute_serially
+  def test_load_balancing_with_multiple_exec_groups_and_coordinators(self):
+    """Tests that the admission controller balance queries across multiple
+    executor groups that mapped to the same request pool when setting
+    balance_queries_across_executor_groups true."""
+    # A long running query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypes \
+             where month < 3 and id + random() < sleep(100);"
+    coordinator_test_args = "-balance_queries_across_executor_groups=true"
+    self._setup_two_coordinator_two_exec_group_cluster(coordinator_test_args)
+    # Create fresh clients
+    self.create_impala_clients()
+    second_coord_client = self.create_client_for_nth_impalad(1)
+    # Check that two queries run in two different groups.
+    self._execute_query_async_using_client_and_verify_exec_group(self.client,
+        QUERY, {'request_pool': 'queue1'}, "Executor Group: root.queue1-group1")
+    self._execute_query_async_using_client_and_verify_exec_group(second_coord_client,
+        QUERY, {'request_pool': 'queue1'}, "Executor Group: root.queue1-group2")
+    self.client.close()
+    second_coord_client.close()