You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/11/11 07:43:17 UTC

[impala] 01/03: IMPALA-10943: Add test to verify support for multiple resource and executor pools

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4d97895c6c22c4802cf97187bd4fe1e69ebc1c12
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Thu Sep 30 18:04:08 2021 -0700

    IMPALA-10943: Add test to verify support for multiple resource and
    executor pools
    
    This patch adds a test to verify that admission control accounting
    works when using multiple coordinators and multiple executor groups
    mapped to different resource pools and having different sizes.
    
    Change-Id: If76d386d8de5730da937674ddd9a69aa1aa1355e
    Reviewed-on: http://gerrit.cloudera.org:8080/17891
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_executor_groups.py | 93 +++++++++++++++++++++++++---
 1 file changed, 86 insertions(+), 7 deletions(-)

diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 750ff0a..d89cd76 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -22,6 +22,7 @@ from tests.util.concurrent_workload import ConcurrentWorkload
 
 import json
 import logging
+import os
 import pytest
 from time import sleep
 
@@ -30,6 +31,8 @@ LOG = logging.getLogger("test_auto_scaling")
 # Non-trivial query that gets scheduled on all executors within a group.
 TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3"
 
+DEFAULT_RESOURCE_POOL = "default-pool"
+
 class TestExecutorGroups(CustomClusterTestSuite):
   """This class contains tests that exercise the logic related to scaling clusters up and
   down by adding and removing groups of executors. All tests start with a base cluster
@@ -47,13 +50,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
     super(TestExecutorGroups, self).setup_method(method)
     self.coordinator = self.cluster.impalads[0]
 
-  def _group_name(self, name):
+  def _group_name(self, resource_pool, name_suffix):
     # By convention, group names must start with their associated resource pool name
-    # followed by a "-". Tests in this class all use the default resource pool.
-    return "default-pool-%s" % name
+    # followed by a "-". Tests in this class mostly use the default resource pool.
+    return "%s-%s" % (resource_pool, name_suffix)
 
   def _add_executor_group(self, name_suffix, min_size, num_executors=0,
-                          admission_control_slots=0, extra_args=None):
+                          admission_control_slots=0, extra_args=None,
+                          resource_pool=DEFAULT_RESOURCE_POOL):
     """Adds an executor group to the cluster. 'min_size' specifies the minimum size for
     the new group to be considered healthy. 'num_executors' specifies the number of
     executors to start and defaults to 'min_size' but can be different from 'min_size' to
@@ -64,7 +68,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     if num_executors == 0:
       num_executors = min_size
     self.num_impalads += num_executors
-    name = self._group_name(name_suffix)
+    name = self._group_name(resource_pool, name_suffix)
     LOG.info("Adding %s executors to group %s with minimum size %s" %
              (num_executors, name, min_size))
     cluster_args = ["--impalad_args=-admission_control_slots=%s" %
@@ -131,7 +135,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     None is returned if the group has no executors or does not exist."""
     METRIC_PREFIX = "admission-controller.executor-group.num-queries-executing.{0}"
     return self.coordinator.service.get_metric_value(
-      METRIC_PREFIX.format(self._group_name(group_name_suffix)))
+      METRIC_PREFIX.format(self._group_name(DEFAULT_RESOURCE_POOL, group_name_suffix)))
 
   def _assert_eventually_in_profile(self, query_handle, expected_str):
     """Assert with a timeout of 60 sec and a polling interval of 1 sec that the
@@ -456,7 +460,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.cancel(q3)
     self.coordinator.service.wait_for_metric_value(
       "admission-controller.executor-group.num-queries-executing.{0}".format(
-        self._group_name(group_names[0])), 0, timeout=30)
+        self._group_name(DEFAULT_RESOURCE_POOL, group_names[0])), 0, timeout=30)
 
   @pytest.mark.execute_serially
   def test_join_strategy_single_executor(self):
@@ -584,3 +588,78 @@ class TestExecutorGroups(CustomClusterTestSuite):
     assert "queue reason: Not enough memory available on host" in profile, profile
     self.close_query(handle_for_first)
     second_coord_client.close_query(handle_for_second)
+
+  @pytest.mark.execute_serially
+  def test_admission_control_with_multiple_coords_and_exec_groups(self):
+    """This test verifies that admission control accounting works when using multiple
+    coordinators and multiple executor groups mapped to different resource pools and
+    having different sizes."""
+    # A long running query that runs on every executor
+    LONG_QUERY = "select * from functional_parquet.alltypes \
+                 where month < 3 and id + random() < sleep(100);"
+    # The path to resources directory which contains the admission control config files.
+    RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
+                                 "resources")
+    fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-allocation.xml")
+    llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml")
+    # Start with a regular admission config with multiple pools and no resource limits.
+    self._restart_coordinators(num_coordinators=2,
+                               extra_args="-vmodule admission-controller=3 "
+                                          "-fair_scheduler_allocation_path %s "
+                                          "-llama_site_path %s" % (
+                                            fs_allocation_path, llama_site_path))
+
+    # Create fresh clients
+    second_coord_client = self.create_client_for_nth_impalad(1)
+    self.create_impala_clients()
+    # Add an exec group with a single admission slot and 2 executors.
+    self._add_executor_group("group", 2, admission_control_slots=1,
+                             resource_pool="root.queue1", extra_args="-mem_limit=2g")
+    # Add an exec group with a single admission slot and only 1 executor.
+    self._add_executor_group("group", 1, admission_control_slots=1,
+                             resource_pool="root.queue2", extra_args="-mem_limit=2g")
+    assert self._get_num_executor_groups(only_healthy=True) == 2
+
+    # Execute a long running query on group 'queue1'
+    self.client.set_configuration({'request_pool': 'queue1'})
+    handle_long_running_queue1 = self.execute_query_async(LONG_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.executor-group.num-queries-executing.root.queue1-group",
+      1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_long_running_queue1)
+    "Executor Group: root.queue1-group" in profile
+
+    # Try to execute another query on group 'queue1'. This one should queue.
+    handle_queued_query_queue1 = self.execute_query_async(TEST_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.root.queue1", 1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_queued_query_queue1)
+    assert "queue reason: Not enough admission control slots available on host" in \
+           profile, profile
+
+    # Execute a query on group 'queue2'. This one will run as its running in another pool.
+    result = self.execute_query_expect_success(self.client, TEST_QUERY,
+                                               query_options={'request_pool': 'queue2'})
+    assert "Executor Group: root.queue2-group" in str(result.runtime_profile)
+
+    # Verify that multiple coordinators' accounting still works correctly in case of
+    # multiple executor groups.
+
+    # Run a query in group 'queue2' on the second coordinator
+    second_coord_client.set_configuration({'request_pool': 'queue2'})
+    second_coord_client.execute_async(LONG_QUERY)
+    # Verify that the first coordinator knows about the query running on the second
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.root.queue2", 1, timeout=30)
+
+    # Check that attempting to run another query in 'queue2' will queue the query.
+    self.client.set_configuration({'request_pool': 'queue2'})
+    handle_queued_query_queue2 = self.execute_query_async(TEST_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.root.queue2", 1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_queued_query_queue2)
+    assert "queue reason: Not enough admission control slots available on host" in \
+           profile, profile
+
+    self.client.close()
+    second_coord_client.close()