You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/08/06 21:42:46 UTC
[impala] 02/02: IMPALA-8816: reduce custom cluster test runtime in
core
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4fb8e8e324ad3258d24d0ae40946c954b6c21a8d
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Jul 30 17:53:55 2019 -0700
IMPALA-8816: reduce custom cluster test runtime in core
This includes some optimisations and a bulk move of tests
to exhaustive.
Move a bunch of custom cluster tests to exhaustive. I selected
these partially based on runtime (i.e. I looked most carefully
at the tests that ran for over a minute) and the likelihood
of them catching a precommit bug. Regression tests for specific
edge cases and tests for parts of the code that are very stable
were prime candidates.
Remove an unnecessary cluster restart in test_breakpad.
Merge test_scheduler_error into test_failpoints to avoid an unnecessary
cluster restart.
Speed up cluster starts by ensuring that the default statestore args are
applied even when _start_impala_cluster() is called directly. This
shaves a couple of seconds off each restart. We made the default args
use a faster update frequency - see IMPALA-7185 - but they did not
take effect in all tests.
Change-Id: Ib2e3e7ebc9695baec4d69183387259958df10f62
Reviewed-on: http://gerrit.cloudera.org:8080/13967
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
tests/common/custom_cluster_test_suite.py | 7 ++-----
tests/common/impala_cluster.py | 14 +++++++++++++-
tests/common/impala_service.py | 8 ++++++--
tests/custom_cluster/test_admission_controller.py | 15 ++++++---------
tests/custom_cluster/test_always_false_filter.py | 6 ++++++
tests/custom_cluster/test_auto_scaling.py | 11 +++++++++++
tests/custom_cluster/test_breakpad.py | 5 +++--
tests/custom_cluster/test_compact_catalog_updates.py | 10 ++++++++++
tests/custom_cluster/test_exchange_deferred_batches.py | 8 +++++++-
tests/custom_cluster/test_hive_parquet_codec_interop.py | 6 ++++++
tests/custom_cluster/test_metadata_replicas.py | 6 ++++++
tests/custom_cluster/test_parquet_max_page_header.py | 10 ++++++++++
tests/custom_cluster/test_permanent_udfs.py | 6 ++++++
tests/custom_cluster/test_query_event_hooks.py | 10 ++++++++++
tests/custom_cluster/test_redaction.py | 6 ++++++
tests/custom_cluster/test_scratch_disk.py | 9 +++++++++
tests/custom_cluster/test_udf_concurrency.py | 6 ++++++
tests/custom_cluster/test_web_pages.py | 10 ++++++++++
tests/failure/test_failpoints.py | 8 +++++++-
19 files changed, 140 insertions(+), 21 deletions(-)
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 22489fe..edae090 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -111,11 +111,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
def decorate(func):
if impalad_args is not None:
func.func_dict[IMPALAD_ARGS] = impalad_args
- if statestored_args is None:
- func.func_dict[STATESTORED_ARGS] = DEFAULT_STATESTORE_ARGS
- else:
- func.func_dict[STATESTORED_ARGS] = \
- DEFAULT_STATESTORE_ARGS + " " + statestored_args
+ func.func_dict[STATESTORED_ARGS] = statestored_args
if catalogd_args is not None:
func.func_dict[CATALOGD_ARGS] = catalogd_args
if start_args is not None:
@@ -246,6 +242,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
# certain custom startup arguments work and we want to keep them independent of dev
# environments.
cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
+ '--state_store_args=%s' % DEFAULT_STATESTORE_ARGS,
'--cluster_size=%d' % cluster_size,
'--num_coordinators=%d' % num_coordinators,
'--log_dir=%s' % impala_log_dir,
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index b70d9dd..c3bee92 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -169,9 +169,21 @@ class ImpalaCluster(object):
if expected_num_ready_impalads is None:
expected_num_ready_impalads = len(self.impalads)
+ def check_processes_still_running():
+ """Check that the processes we waited for above (i.e. impalads, statestored,
+ catalogd) are still running. Throw an exception otherwise."""
+ self.refresh()
+ # The number of impalad processes may temporarily increase if breakpad forked a
+ # process to write a minidump.
+ assert len(self.impalads) >= expected_num_impalads
+ assert self.statestored is not None
+ assert self.catalogd is not None
+
+
for impalad in self.impalads:
impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads,
- timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
+ timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2,
+ early_abort_fn=check_processes_still_running)
if (impalad._get_arg_value("is_coordinator", default="true") == "true" and
impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0):
impalad.wait_for_catalog()
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index cba0a0f..7da100c 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -251,12 +251,16 @@ class ImpaladService(BaseImpalaService):
return False
def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1,
- include_shutting_down=True):
+ include_shutting_down=True, early_abort_fn=lambda: False):
+ """Poll the debug web server until the number of backends known by this service
+ reaches 'expected_value'. 'early_abort_fn' is called periodically and can
+ throw an exception if polling should be aborted early."""
start_time = time()
while (time() - start_time < timeout):
+ early_abort_fn()
value = None
try:
- value = self.get_num_known_live_backends(timeout=timeout, interval=interval,
+ value = self.get_num_known_live_backends(timeout=1, interval=interval,
include_shutting_down=include_shutting_down)
except Exception, e:
LOG.error(e)
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index c85220c..a180503 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -396,6 +396,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def test_trivial_coord_query_limits(self):
"""Tests that trivial coordinator only queries have negligible resource requirements.
"""
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
# Queries with only constant exprs or limit 0 should be admitted.
self.execute_query_expect_success(self.client, "select 1")
self.execute_query_expect_success(self.client,
@@ -856,6 +858,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
def test_query_locations_correctness(self, vector):
"""Regression test for IMPALA-7516: Test to make sure query locations and in-flight
queries are correct for different admission results that can affect it."""
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
# Choose a query that runs on all 3 backends.
query = "select * from functional.alltypesagg A, (select sleep(10000)) B limit 1"
# Case 1: When a query runs succesfully.
@@ -1283,15 +1287,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
assert len(reasons) == 1
assert "Local backend has not started up yet." in reasons[0]
- @pytest.mark.execute_serially
- def test_scheduler_error(self):
- """This test verifies that the admission controller handles scheduler errors
- correctly."""
- client = self.create_impala_client()
- client.set_configuration_option("debug_action", "SCHEDULER_SCHEDULE:FAIL")
- result = self.execute_query_expect_failure(client, "select 1")
- assert "Error during scheduling" in str(result)
-
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between submissions
@@ -1795,6 +1790,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
max_queued=MAX_NUM_QUEUED_QUERIES, pool_max_mem=-1, queue_wait_timeout_ms=600000),
statestored_args=_STATESTORED_ARGS)
def test_admission_controller_with_flags(self, vector):
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
self.pool_name = 'default-pool'
# The pool has no mem resources set, so submitting queries with huge mem_limits
# should be fine. This exercises the code that does the per-pool memory
diff --git a/tests/custom_cluster/test_always_false_filter.py b/tests/custom_cluster/test_always_false_filter.py
index e64a8c2..1bc8a2d 100644
--- a/tests/custom_cluster/test_always_false_filter.py
+++ b/tests/custom_cluster/test_always_false_filter.py
@@ -26,6 +26,12 @@ class TestAlwaysFalseFilter(CustomClusterTestSuite):
def get_workload(cls):
return 'functional-query'
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestAlwaysFalseFilter, cls).setup_class()
+
@SkipIfBuildType.not_dev_build
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true")
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index 2bc4600..65149bd 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -18,6 +18,7 @@
# under the License.
import logging
+import pytest
from time import sleep, time
from tests.util.auto_scaler import AutoScaler
@@ -29,6 +30,16 @@ LOG = logging.getLogger("test_auto_scaling")
class TestAutoScaling(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestAutoScaling, cls).setup_class()
+
"""This class contains tests that exercise the logic related to scaling clusters up and
down by adding and removing groups of executors."""
INITIAL_STARTUP_TIME_S = 10
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 1c2d6bf..4ed7b04 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -60,8 +60,6 @@ class TestBreakpadBase(CustomClusterTestSuite):
def teardown_class(cls):
# Re-enable core dumps
setrlimit(RLIMIT_CORE, (RLIM_INFINITY, RLIM_INFINITY))
- # Start default cluster for subsequent tests (verify_metrics).
- cls._start_impala_cluster([])
def start_cluster_with_args(self, **kwargs):
cluster_options = []
@@ -163,6 +161,9 @@ class TestBreakpadCore(TestBreakpadBase):
except CalledProcessError:
failed_to_start = True
assert failed_to_start
+ # Don't check for minidumps until all processes have gone away so that
+ # the state of the cluster is not in flux.
+ self.wait_for_num_processes('impalad', 0)
assert self.count_minidumps('impalad') > 0
diff --git a/tests/custom_cluster/test_compact_catalog_updates.py b/tests/custom_cluster/test_compact_catalog_updates.py
index 05252d5..4440e3b 100644
--- a/tests/custom_cluster/test_compact_catalog_updates.py
+++ b/tests/custom_cluster/test_compact_catalog_updates.py
@@ -22,6 +22,16 @@ import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestCompactCatalogUpdates(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestCompactCatalogUpdates, cls).setup_class()
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="--compact_catalog_topic=false",
catalogd_args="--compact_catalog_topic=false")
diff --git a/tests/custom_cluster/test_exchange_deferred_batches.py b/tests/custom_cluster/test_exchange_deferred_batches.py
index 9a27d06..31e722e 100644
--- a/tests/custom_cluster/test_exchange_deferred_batches.py
+++ b/tests/custom_cluster/test_exchange_deferred_batches.py
@@ -25,7 +25,13 @@ class TestExchangeDeferredBatches(CustomClusterTestSuite):
@classmethod
def get_workload(cls):
- return 'tpch'
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestExchangeDeferredBatches, cls).setup_class()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
diff --git a/tests/custom_cluster/test_hive_parquet_codec_interop.py b/tests/custom_cluster/test_hive_parquet_codec_interop.py
index 89795ca..d0cea20 100644
--- a/tests/custom_cluster/test_hive_parquet_codec_interop.py
+++ b/tests/custom_cluster/test_hive_parquet_codec_interop.py
@@ -35,6 +35,12 @@ class TestParquetInterop(CustomClusterTestSuite):
return 'functional-query'
@classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestParquetInterop, cls).setup_class()
+
+ @classmethod
def add_test_dimensions(cls):
super(CustomClusterTestSuite, cls).add_test_dimensions()
# Fix the exec_option vector to have a single value.
diff --git a/tests/custom_cluster/test_metadata_replicas.py b/tests/custom_cluster/test_metadata_replicas.py
index ff34cf2..1d821a0 100644
--- a/tests/custom_cluster/test_metadata_replicas.py
+++ b/tests/custom_cluster/test_metadata_replicas.py
@@ -39,6 +39,12 @@ class TestMetadataReplicas(CustomClusterTestSuite):
def get_workload(cls):
return 'functional-query'
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestMetadataReplicas, cls).setup_class()
+
@pytest.mark.execute_serially
def test_start(self):
""" Baseline to verify that the initial state is identical. No DDL/DML
diff --git a/tests/custom_cluster/test_parquet_max_page_header.py b/tests/custom_cluster/test_parquet_max_page_header.py
index 0c107b2..e1c9e0e 100644
--- a/tests/custom_cluster/test_parquet_max_page_header.py
+++ b/tests/custom_cluster/test_parquet_max_page_header.py
@@ -32,6 +32,16 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite):
adjust --max_page_header_size, which is the maximum bytes of header data that the
scanner reads before it bails out.
'''
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestParquetMaxPageHeader, cls).setup_class()
+
TEXT_TABLE_NAME = "parquet_test_data_text"
PARQUET_TABLE_NAME = "large_page_header"
diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py
index 41b4b01..5c24981 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -45,6 +45,12 @@ class TestUdfPersistence(CustomClusterTestSuite):
return 'functional-query'
@classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestUdfPersistence, cls).setup_class()
+
+ @classmethod
def add_test_dimensions(cls):
super(TestUdfPersistence, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
diff --git a/tests/custom_cluster/test_query_event_hooks.py b/tests/custom_cluster/test_query_event_hooks.py
index fc8bd38..d0d22ac 100644
--- a/tests/custom_cluster/test_query_event_hooks.py
+++ b/tests/custom_cluster/test_query_event_hooks.py
@@ -56,6 +56,16 @@ class TestHooksStartupFail(CustomClusterTestSuite):
All test cases in this testsuite are expected to fail cluster startup and will
swallow exceptions thrown during setup_method().
"""
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestHooksStartupFail, cls).setup_class()
+
FAILING_HOOK = "org.apache.impala.testutil.AlwaysErrorQueryEventHook"
NONEXIST_HOOK = "captain.hook"
LOG_DIR1 = tempfile.mkdtemp(prefix="test_hooks_startup_fail_", dir=os.getenv("LOG_DIR"))
diff --git a/tests/custom_cluster/test_redaction.py b/tests/custom_cluster/test_redaction.py
index e13d365..1e436eb 100644
--- a/tests/custom_cluster/test_redaction.py
+++ b/tests/custom_cluster/test_redaction.py
@@ -170,6 +170,8 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
@pytest.mark.execute_serially
def test_bad_rules(self):
'''Check that the server fails to start if the redaction rules are bad.'''
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
startup_options = dict()
self.assert_server_fails_to_start('{ "version": 100 }', startup_options,
'Error parsing redaction rules; only version 1 is supported')
@@ -182,6 +184,8 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
could dump table data. Row logging would be enabled with "-v=3" or could be
enabled with the -vmodule option. In either case the server should not start.
'''
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
rules = r"""
{
"version": 1,
@@ -213,6 +217,8 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
rules are set. The expectation is the full query text will show up in the logs
and the web ui.
'''
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
self.start_cluster_using_rules('')
email = 'foo@bar.com'
self.execute_query_expect_success(self.client,
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 5640293..95136db 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -26,6 +26,15 @@ import tempfile
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestScratchDir(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestScratchDir, cls).setup_class()
# Query with order by requires spill to disk if intermediate results don't fit in mem
spill_query = """
diff --git a/tests/custom_cluster/test_udf_concurrency.py b/tests/custom_cluster/test_udf_concurrency.py
index 35a41c4..dfcd6bb 100644
--- a/tests/custom_cluster/test_udf_concurrency.py
+++ b/tests/custom_cluster/test_udf_concurrency.py
@@ -38,6 +38,12 @@ class TestUdfConcurrency(CustomClusterTestSuite):
return 'functional-query'
@classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestUdfConcurrency, cls).setup_class()
+
+ @classmethod
def add_test_dimensions(cls):
super(TestUdfConcurrency, cls).add_test_dimensions()
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 58dcf93..cd4febb 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -23,6 +23,16 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestWebPage(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestWebPage, cls).setup_class()
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--enable_extended_memory_metrics=true"
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index fd7818a..7ae8e00 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -146,10 +146,16 @@ class TestFailpoints(ImpalaTestSuite):
or hangs"""
query = "select * from tpch.lineitem limit 10000"
+ # Test that the admission controller handles scheduler errors correctly.
+ debug_action = "SCHEDULER_SCHEDULE:FAIL"
+ result = self.execute_query_expect_failure(self.client, query,
+ query_options={'debug_action': debug_action})
+ assert "Error during scheduling" in str(result)
+
# Fail the Prepare() phase of all fragment instances.
debug_action = 'FIS_IN_PREPARE:FAIL@1.0'
self.execute_query_expect_failure(self.client, query,
- query_options={'debug_action':debug_action})
+ query_options={'debug_action': debug_action})
# Fail the Open() phase of all fragment instances.
debug_action = 'FIS_IN_OPEN:FAIL@1.0'