You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2021/11/11 07:43:17 UTC
[impala] 01/03: IMPALA-10943: Add test to verify support for
multiple resource and executor pools
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4d97895c6c22c4802cf97187bd4fe1e69ebc1c12
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Thu Sep 30 18:04:08 2021 -0700
IMPALA-10943: Add test to verify support for multiple resource and
executor pools
This patch adds a test to verify that admission control accounting
works when using multiple coordinators and multiple executor groups
mapped to different resource pools and having different sizes.
Change-Id: If76d386d8de5730da937674ddd9a69aa1aa1355e
Reviewed-on: http://gerrit.cloudera.org:8080/17891
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
tests/custom_cluster/test_executor_groups.py | 93 +++++++++++++++++++++++++---
1 file changed, 86 insertions(+), 7 deletions(-)
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 750ff0a..d89cd76 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -22,6 +22,7 @@ from tests.util.concurrent_workload import ConcurrentWorkload
import json
import logging
+import os
import pytest
from time import sleep
@@ -30,6 +31,8 @@ LOG = logging.getLogger("test_auto_scaling")
# Non-trivial query that gets scheduled on all executors within a group.
TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3"
+DEFAULT_RESOURCE_POOL = "default-pool"
+
class TestExecutorGroups(CustomClusterTestSuite):
"""This class contains tests that exercise the logic related to scaling clusters up and
down by adding and removing groups of executors. All tests start with a base cluster
@@ -47,13 +50,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
super(TestExecutorGroups, self).setup_method(method)
self.coordinator = self.cluster.impalads[0]
- def _group_name(self, name):
+ def _group_name(self, resource_pool, name_suffix):
# By convention, group names must start with their associated resource pool name
- # followed by a "-". Tests in this class all use the default resource pool.
- return "default-pool-%s" % name
+ # followed by a "-". Tests in this class mostly use the default resource pool.
+ return "%s-%s" % (resource_pool, name_suffix)
def _add_executor_group(self, name_suffix, min_size, num_executors=0,
- admission_control_slots=0, extra_args=None):
+ admission_control_slots=0, extra_args=None,
+ resource_pool=DEFAULT_RESOURCE_POOL):
"""Adds an executor group to the cluster. 'min_size' specifies the minimum size for
the new group to be considered healthy. 'num_executors' specifies the number of
executors to start and defaults to 'min_size' but can be different from 'min_size' to
@@ -64,7 +68,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
if num_executors == 0:
num_executors = min_size
self.num_impalads += num_executors
- name = self._group_name(name_suffix)
+ name = self._group_name(resource_pool, name_suffix)
LOG.info("Adding %s executors to group %s with minimum size %s" %
(num_executors, name, min_size))
cluster_args = ["--impalad_args=-admission_control_slots=%s" %
@@ -131,7 +135,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
None is returned if the group has no executors or does not exist."""
METRIC_PREFIX = "admission-controller.executor-group.num-queries-executing.{0}"
return self.coordinator.service.get_metric_value(
- METRIC_PREFIX.format(self._group_name(group_name_suffix)))
+ METRIC_PREFIX.format(self._group_name(DEFAULT_RESOURCE_POOL, group_name_suffix)))
def _assert_eventually_in_profile(self, query_handle, expected_str):
"""Assert with a timeout of 60 sec and a polling interval of 1 sec that the
@@ -456,7 +460,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
client.cancel(q3)
self.coordinator.service.wait_for_metric_value(
"admission-controller.executor-group.num-queries-executing.{0}".format(
- self._group_name(group_names[0])), 0, timeout=30)
+ self._group_name(DEFAULT_RESOURCE_POOL, group_names[0])), 0, timeout=30)
@pytest.mark.execute_serially
def test_join_strategy_single_executor(self):
@@ -584,3 +588,78 @@ class TestExecutorGroups(CustomClusterTestSuite):
assert "queue reason: Not enough memory available on host" in profile, profile
self.close_query(handle_for_first)
second_coord_client.close_query(handle_for_second)
+
+ @pytest.mark.execute_serially
+ def test_admission_control_with_multiple_coords_and_exec_groups(self):
+ """This test verifies that admission control accounting works when using multiple
+ coordinators and multiple executor groups mapped to different resource pools and
+ having different sizes."""
+ # A long running query that runs on every executor
+ LONG_QUERY = "select * from functional_parquet.alltypes \
+ where month < 3 and id + random() < sleep(100);"
+ # The path to resources directory which contains the admission control config files.
+ RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test",
+ "resources")
+ fs_allocation_path = os.path.join(RESOURCES_DIR, "fair-scheduler-allocation.xml")
+ llama_site_path = os.path.join(RESOURCES_DIR, "llama-site-empty.xml")
+ # Start with a regular admission config with multiple pools and no resource limits.
+ self._restart_coordinators(num_coordinators=2,
+ extra_args="-vmodule admission-controller=3 "
+ "-fair_scheduler_allocation_path %s "
+ "-llama_site_path %s" % (
+ fs_allocation_path, llama_site_path))
+
+ # Create fresh clients
+ second_coord_client = self.create_client_for_nth_impalad(1)
+ self.create_impala_clients()
+ # Add an exec group with a single admission slot and 2 executors.
+ self._add_executor_group("group", 2, admission_control_slots=1,
+ resource_pool="root.queue1", extra_args="-mem_limit=2g")
+ # Add an exec group with a single admission slot and only 1 executor.
+ self._add_executor_group("group", 1, admission_control_slots=1,
+ resource_pool="root.queue2", extra_args="-mem_limit=2g")
+ assert self._get_num_executor_groups(only_healthy=True) == 2
+
+ # Execute a long running query on group 'queue1'
+ self.client.set_configuration({'request_pool': 'queue1'})
+ handle_long_running_queue1 = self.execute_query_async(LONG_QUERY)
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.executor-group.num-queries-executing.root.queue1-group",
+ 1, timeout=30)
+ profile = self.client.get_runtime_profile(handle_long_running_queue1)
+ "Executor Group: root.queue1-group" in profile
+
+ # Try to execute another query on group 'queue1'. This one should queue.
+ handle_queued_query_queue1 = self.execute_query_async(TEST_QUERY)
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.local-num-queued.root.queue1", 1, timeout=30)
+ profile = self.client.get_runtime_profile(handle_queued_query_queue1)
+ assert "queue reason: Not enough admission control slots available on host" in \
+ profile, profile
+
+ # Execute a query on group 'queue2'. This one will run as its running in another pool.
+ result = self.execute_query_expect_success(self.client, TEST_QUERY,
+ query_options={'request_pool': 'queue2'})
+ assert "Executor Group: root.queue2-group" in str(result.runtime_profile)
+
+ # Verify that multiple coordinators' accounting still works correctly in case of
+ # multiple executor groups.
+
+ # Run a query in group 'queue2' on the second coordinator
+ second_coord_client.set_configuration({'request_pool': 'queue2'})
+ second_coord_client.execute_async(LONG_QUERY)
+ # Verify that the first coordinator knows about the query running on the second
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.agg-num-running.root.queue2", 1, timeout=30)
+
+ # Check that attempting to run another query in 'queue2' will queue the query.
+ self.client.set_configuration({'request_pool': 'queue2'})
+ handle_queued_query_queue2 = self.execute_query_async(TEST_QUERY)
+ self.coordinator.service.wait_for_metric_value(
+ "admission-controller.local-num-queued.root.queue2", 1, timeout=30)
+ profile = self.client.get_runtime_profile(handle_queued_query_queue2)
+ assert "queue reason: Not enough admission control slots available on host" in \
+ profile, profile
+
+ self.client.close()
+ second_coord_client.close()