You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/08/06 21:42:46 UTC

[impala] 02/02: IMPALA-8816: reduce custom cluster test runtime in core

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4fb8e8e324ad3258d24d0ae40946c954b6c21a8d
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Jul 30 17:53:55 2019 -0700

    IMPALA-8816: reduce custom cluster test runtime in core
    
    This includes some optimisations and a bulk move of tests
    to exhaustive.
    
    Move a bunch of custom cluster tests to exhaustive. I selected
    these partially based on runtime (i.e. I looked most carefully
    at the tests that ran for over a minute) and the likelihood
    of them catching a precommit bug.  Regression tests for specific
    edge cases and tests for parts of the code that are very stable
    were prime candidates.
    
    Remove an unnecessary cluster restart in test_breakpad.
    
    Merge test_scheduler_error into test_failpoints to avoid an unnecessary
    cluster restart.
    
    Speed up cluster starts by ensuring that the default statestore args are
    applied even when _start_impala_cluster() is called directly. This
    shaves a couple of seconds off each restart. We made the default args
    use a faster update frequency - see IMPALA-7185 - but they did not
    take effect in all tests.
    
    Change-Id: Ib2e3e7ebc9695baec4d69183387259958df10f62
    Reviewed-on: http://gerrit.cloudera.org:8080/13967
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/common/custom_cluster_test_suite.py               |  7 ++-----
 tests/common/impala_cluster.py                          | 14 +++++++++++++-
 tests/common/impala_service.py                          |  8 ++++++--
 tests/custom_cluster/test_admission_controller.py       | 15 ++++++---------
 tests/custom_cluster/test_always_false_filter.py        |  6 ++++++
 tests/custom_cluster/test_auto_scaling.py               | 11 +++++++++++
 tests/custom_cluster/test_breakpad.py                   |  5 +++--
 tests/custom_cluster/test_compact_catalog_updates.py    | 10 ++++++++++
 tests/custom_cluster/test_exchange_deferred_batches.py  |  8 +++++++-
 tests/custom_cluster/test_hive_parquet_codec_interop.py |  6 ++++++
 tests/custom_cluster/test_metadata_replicas.py          |  6 ++++++
 tests/custom_cluster/test_parquet_max_page_header.py    | 10 ++++++++++
 tests/custom_cluster/test_permanent_udfs.py             |  6 ++++++
 tests/custom_cluster/test_query_event_hooks.py          | 10 ++++++++++
 tests/custom_cluster/test_redaction.py                  |  6 ++++++
 tests/custom_cluster/test_scratch_disk.py               |  9 +++++++++
 tests/custom_cluster/test_udf_concurrency.py            |  6 ++++++
 tests/custom_cluster/test_web_pages.py                  | 10 ++++++++++
 tests/failure/test_failpoints.py                        |  8 +++++++-
 19 files changed, 140 insertions(+), 21 deletions(-)

diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 22489fe..edae090 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -111,11 +111,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     def decorate(func):
       if impalad_args is not None:
         func.func_dict[IMPALAD_ARGS] = impalad_args
-      if statestored_args is None:
-        func.func_dict[STATESTORED_ARGS] = DEFAULT_STATESTORE_ARGS
-      else:
-        func.func_dict[STATESTORED_ARGS] = \
-            DEFAULT_STATESTORE_ARGS + " " + statestored_args
+      func.func_dict[STATESTORED_ARGS] = statestored_args
       if catalogd_args is not None:
         func.func_dict[CATALOGD_ARGS] = catalogd_args
       if start_args is not None:
@@ -246,6 +242,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     # certain custom startup arguments work and we want to keep them independent of dev
     # environments.
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
+           '--state_store_args=%s' % DEFAULT_STATESTORE_ARGS,
            '--cluster_size=%d' % cluster_size,
            '--num_coordinators=%d' % num_coordinators,
            '--log_dir=%s' % impala_log_dir,
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index b70d9dd..c3bee92 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -169,9 +169,21 @@ class ImpalaCluster(object):
     if expected_num_ready_impalads is None:
       expected_num_ready_impalads = len(self.impalads)
 
+    def check_processes_still_running():
+      """Check that the processes we waited for above (i.e. impalads, statestored,
+      catalogd) are still running. Throw an exception otherwise."""
+      self.refresh()
+      # The number of impalad processes may temporarily increase if breakpad forked a
+      # process to write a minidump.
+      assert len(self.impalads) >= expected_num_impalads
+      assert self.statestored is not None
+      assert self.catalogd is not None
+
+
     for impalad in self.impalads:
       impalad.service.wait_for_num_known_live_backends(expected_num_ready_impalads,
-          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
+          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2,
+          early_abort_fn=check_processes_still_running)
       if (impalad._get_arg_value("is_coordinator", default="true") == "true" and
          impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0):
         impalad.wait_for_catalog()
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index cba0a0f..7da100c 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -251,12 +251,16 @@ class ImpaladService(BaseImpalaService):
     return False
 
   def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1,
-      include_shutting_down=True):
+      include_shutting_down=True, early_abort_fn=lambda: False):
+    """Poll the debug web server until the number of backends known by this service
+    reaches 'expected_value'. 'early_abort_fn' is called periodically and can
+    throw an exception if polling should be aborted early."""
     start_time = time()
     while (time() - start_time < timeout):
+      early_abort_fn()
       value = None
       try:
-        value = self.get_num_known_live_backends(timeout=timeout, interval=interval,
+        value = self.get_num_known_live_backends(timeout=1, interval=interval,
             include_shutting_down=include_shutting_down)
       except Exception, e:
         LOG.error(e)
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index c85220c..a180503 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -396,6 +396,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   def test_trivial_coord_query_limits(self):
     """Tests that trivial coordinator only queries have negligible resource requirements.
     """
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
     # Queries with only constant exprs or limit 0 should be admitted.
     self.execute_query_expect_success(self.client, "select 1")
     self.execute_query_expect_success(self.client,
@@ -856,6 +858,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
   def test_query_locations_correctness(self, vector):
     """Regression test for IMPALA-7516: Test to make sure query locations and in-flight
     queries are correct for different admission results that can affect it."""
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
     # Choose a query that runs on all 3 backends.
     query = "select * from functional.alltypesagg A, (select sleep(10000)) B limit 1"
     # Case 1: When a query runs succesfully.
@@ -1283,15 +1287,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     assert len(reasons) == 1
     assert "Local backend has not started up yet." in reasons[0]
 
-  @pytest.mark.execute_serially
-  def test_scheduler_error(self):
-    """This test verifies that the admission controller handles scheduler errors
-    correctly."""
-    client = self.create_impala_client()
-    client.set_configuration_option("debug_action", "SCHEDULER_SCHEDULE:FAIL")
-    result = self.execute_query_expect_failure(client, "select 1")
-    assert "Error during scheduling" in str(result)
-
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
@@ -1795,6 +1790,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         max_queued=MAX_NUM_QUEUED_QUERIES, pool_max_mem=-1, queue_wait_timeout_ms=600000),
       statestored_args=_STATESTORED_ARGS)
   def test_admission_controller_with_flags(self, vector):
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
     self.pool_name = 'default-pool'
     # The pool has no mem resources set, so submitting queries with huge mem_limits
     # should be fine. This exercises the code that does the per-pool memory
diff --git a/tests/custom_cluster/test_always_false_filter.py b/tests/custom_cluster/test_always_false_filter.py
index e64a8c2..1bc8a2d 100644
--- a/tests/custom_cluster/test_always_false_filter.py
+++ b/tests/custom_cluster/test_always_false_filter.py
@@ -26,6 +26,12 @@ class TestAlwaysFalseFilter(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestAlwaysFalseFilter, cls).setup_class()
+
   @SkipIfBuildType.not_dev_build
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true")
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index 2bc4600..65149bd 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import logging
+import pytest
 from time import sleep, time
 
 from tests.util.auto_scaler import AutoScaler
@@ -29,6 +30,16 @@ LOG = logging.getLogger("test_auto_scaling")
 
 
 class TestAutoScaling(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestAutoScaling, cls).setup_class()
+
   """This class contains tests that exercise the logic related to scaling clusters up and
   down by adding and removing groups of executors."""
   INITIAL_STARTUP_TIME_S = 10
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 1c2d6bf..4ed7b04 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -60,8 +60,6 @@ class TestBreakpadBase(CustomClusterTestSuite):
   def teardown_class(cls):
     # Re-enable core dumps
     setrlimit(RLIMIT_CORE, (RLIM_INFINITY, RLIM_INFINITY))
-    # Start default cluster for subsequent tests (verify_metrics).
-    cls._start_impala_cluster([])
 
   def start_cluster_with_args(self, **kwargs):
     cluster_options = []
@@ -163,6 +161,9 @@ class TestBreakpadCore(TestBreakpadBase):
     except CalledProcessError:
       failed_to_start = True
     assert failed_to_start
+    # Don't check for minidumps until all processes have gone away so that
+    # the state of the cluster is not in flux.
+    self.wait_for_num_processes('impalad', 0)
     assert self.count_minidumps('impalad') > 0
 
 
diff --git a/tests/custom_cluster/test_compact_catalog_updates.py b/tests/custom_cluster/test_compact_catalog_updates.py
index 05252d5..4440e3b 100644
--- a/tests/custom_cluster/test_compact_catalog_updates.py
+++ b/tests/custom_cluster/test_compact_catalog_updates.py
@@ -22,6 +22,16 @@ import pytest
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 class TestCompactCatalogUpdates(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestCompactCatalogUpdates, cls).setup_class()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="--compact_catalog_topic=false",
       catalogd_args="--compact_catalog_topic=false")
diff --git a/tests/custom_cluster/test_exchange_deferred_batches.py b/tests/custom_cluster/test_exchange_deferred_batches.py
index 9a27d06..31e722e 100644
--- a/tests/custom_cluster/test_exchange_deferred_batches.py
+++ b/tests/custom_cluster/test_exchange_deferred_batches.py
@@ -25,7 +25,13 @@ class TestExchangeDeferredBatches(CustomClusterTestSuite):
 
     @classmethod
     def get_workload(cls):
-      return 'tpch'
+      return 'functional-query'
+
+    @classmethod
+    def setup_class(cls):
+      if cls.exploration_strategy() != 'exhaustive':
+        pytest.skip('runs only in exhaustive')
+      super(TestExchangeDeferredBatches, cls).setup_class()
 
     @pytest.mark.execute_serially
     @CustomClusterTestSuite.with_args(
diff --git a/tests/custom_cluster/test_hive_parquet_codec_interop.py b/tests/custom_cluster/test_hive_parquet_codec_interop.py
index 89795ca..d0cea20 100644
--- a/tests/custom_cluster/test_hive_parquet_codec_interop.py
+++ b/tests/custom_cluster/test_hive_parquet_codec_interop.py
@@ -35,6 +35,12 @@ class TestParquetInterop(CustomClusterTestSuite):
     return 'functional-query'
 
   @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestParquetInterop, cls).setup_class()
+
+  @classmethod
   def add_test_dimensions(cls):
     super(CustomClusterTestSuite, cls).add_test_dimensions()
     # Fix the exec_option vector to have a single value.
diff --git a/tests/custom_cluster/test_metadata_replicas.py b/tests/custom_cluster/test_metadata_replicas.py
index ff34cf2..1d821a0 100644
--- a/tests/custom_cluster/test_metadata_replicas.py
+++ b/tests/custom_cluster/test_metadata_replicas.py
@@ -39,6 +39,12 @@ class TestMetadataReplicas(CustomClusterTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestMetadataReplicas, cls).setup_class()
+
   @pytest.mark.execute_serially
   def test_start(self):
     """ Baseline to verify that the initial state is identical. No DDL/DML
diff --git a/tests/custom_cluster/test_parquet_max_page_header.py b/tests/custom_cluster/test_parquet_max_page_header.py
index 0c107b2..e1c9e0e 100644
--- a/tests/custom_cluster/test_parquet_max_page_header.py
+++ b/tests/custom_cluster/test_parquet_max_page_header.py
@@ -32,6 +32,16 @@ class TestParquetMaxPageHeader(CustomClusterTestSuite):
   adjust --max_page_header_size, which is the maximum bytes of header data that the
   scanner reads before it bails out.
   '''
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestParquetMaxPageHeader, cls).setup_class()
+
 
   TEXT_TABLE_NAME = "parquet_test_data_text"
   PARQUET_TABLE_NAME = "large_page_header"
diff --git a/tests/custom_cluster/test_permanent_udfs.py b/tests/custom_cluster/test_permanent_udfs.py
index 41b4b01..5c24981 100644
--- a/tests/custom_cluster/test_permanent_udfs.py
+++ b/tests/custom_cluster/test_permanent_udfs.py
@@ -45,6 +45,12 @@ class TestUdfPersistence(CustomClusterTestSuite):
     return 'functional-query'
 
   @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestUdfPersistence, cls).setup_class()
+
+  @classmethod
   def add_test_dimensions(cls):
     super(TestUdfPersistence, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(
diff --git a/tests/custom_cluster/test_query_event_hooks.py b/tests/custom_cluster/test_query_event_hooks.py
index fc8bd38..d0d22ac 100644
--- a/tests/custom_cluster/test_query_event_hooks.py
+++ b/tests/custom_cluster/test_query_event_hooks.py
@@ -56,6 +56,16 @@ class TestHooksStartupFail(CustomClusterTestSuite):
   All test cases in this testsuite are expected to fail cluster startup and will
   swallow exceptions thrown during setup_method().
   """
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestHooksStartupFail, cls).setup_class()
+
   FAILING_HOOK = "org.apache.impala.testutil.AlwaysErrorQueryEventHook"
   NONEXIST_HOOK = "captain.hook"
   LOG_DIR1 = tempfile.mkdtemp(prefix="test_hooks_startup_fail_", dir=os.getenv("LOG_DIR"))
diff --git a/tests/custom_cluster/test_redaction.py b/tests/custom_cluster/test_redaction.py
index e13d365..1e436eb 100644
--- a/tests/custom_cluster/test_redaction.py
+++ b/tests/custom_cluster/test_redaction.py
@@ -170,6 +170,8 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
   @pytest.mark.execute_serially
   def test_bad_rules(self):
     '''Check that the server fails to start if the redaction rules are bad.'''
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
     startup_options = dict()
     self.assert_server_fails_to_start('{ "version": 100 }', startup_options,
         'Error parsing redaction rules; only version 1 is supported')
@@ -182,6 +184,8 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
        could dump table data. Row logging would be enabled with "-v=3" or could be
        enabled  with the -vmodule option. In either case the server should not start.
     '''
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
     rules = r"""
         {
           "version": 1,
@@ -213,6 +217,8 @@ class TestRedaction(CustomClusterTestSuite, unittest.TestCase):
        rules are set. The expectation is the full query text will show up in the logs
        and the web ui.
     '''
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
     self.start_cluster_using_rules('')
     email = 'foo@bar.com'
     self.execute_query_expect_success(self.client,
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 5640293..95136db 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -26,6 +26,15 @@ import tempfile
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 class TestScratchDir(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestScratchDir, cls).setup_class()
 
   # Query with order by requires spill to disk if intermediate results don't fit in mem
   spill_query = """
diff --git a/tests/custom_cluster/test_udf_concurrency.py b/tests/custom_cluster/test_udf_concurrency.py
index 35a41c4..dfcd6bb 100644
--- a/tests/custom_cluster/test_udf_concurrency.py
+++ b/tests/custom_cluster/test_udf_concurrency.py
@@ -38,6 +38,12 @@ class TestUdfConcurrency(CustomClusterTestSuite):
     return 'functional-query'
 
   @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestUdfConcurrency, cls).setup_class()
+
+  @classmethod
   def add_test_dimensions(cls):
     super(TestUdfConcurrency, cls).add_test_dimensions()
 
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 58dcf93..cd4febb 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -23,6 +23,16 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 
 class TestWebPage(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestWebPage, cls).setup_class()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--enable_extended_memory_metrics=true"
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index fd7818a..7ae8e00 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -146,10 +146,16 @@ class TestFailpoints(ImpalaTestSuite):
     or hangs"""
     query = "select * from tpch.lineitem limit 10000"
 
+    # Test that the admission controller handles scheduler errors correctly.
+    debug_action = "SCHEDULER_SCHEDULE:FAIL"
+    result = self.execute_query_expect_failure(self.client, query,
+        query_options={'debug_action': debug_action})
+    assert "Error during scheduling" in str(result)
+
     # Fail the Prepare() phase of all fragment instances.
     debug_action = 'FIS_IN_PREPARE:FAIL@1.0'
     self.execute_query_expect_failure(self.client, query,
-        query_options={'debug_action':debug_action})
+        query_options={'debug_action': debug_action})
 
     # Fail the Open() phase of all fragment instances.
     debug_action = 'FIS_IN_OPEN:FAIL@1.0'