You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/20 00:57:24 UTC

[impala] 04/04: IMPALA-8207: Fix query loading for perf and stress tests

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b8a8edddcb727a28c2d15bdb3533a32454364ade
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Mon Feb 11 20:56:05 2019 +0000

    IMPALA-8207: Fix query loading for perf and stress tests
    
    Problems with perf queries (run-workload.py):
    - TPCH picks up stress test specific queries (TPCH-AGG1/2/3)
    - TPCDS picks up queries that were intended just to validate that data
      was loaded properly but that aren't interesting from a perf
      perspective (TPCDS-COUNT-<table>)
    - TPCDS picks up both decimal_v1 and decimal_v2 queries. This is
      mostly harmless as for queries with matching names only one gets run
      but it causes some queries with mismatched names to be run twice
      (TPCDS-Q39-1/2 vs. TPCDS-Q39.1/2)
    
    Problems with stress queries (concurrent_select.py):
    - TPCDS fails to pick up Q22A as it does not use the decimal_v2
      queries, even though decimal_v2 is the default now.
    
    This problem is exacerbated by the fact that the two scripts have
    different code paths for selecting the queries, so in the past changes
    that were made to one path were not always made to the other.
    
    This patch merges the two paths to reduce code duplication and prevent
    these sorts of issues in the future, and fixes the above issues.
    
    One complication is that historically the stress test has used query
    names in the form 'q1' whereas the perf test has used query names in
    the form 'TPCH-Q1'. This patch standardizes on using 'TPCH-Q1'.
    
    Testing:
    - Added a test that checks that the perf tests pick up the expected
      number of queries.
    - Manually ran the scripts and verified that the correct queries are
      selected.
    
    Change-Id: Id1966d6ca8babdda07d47e089b75ba06d0318c0d
    Reviewed-on: http://gerrit.cloudera.org:8080/12503
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../tpcds/queries/tpcds-decimal_v2-q39-1.test      |  2 +-
 .../tpcds/queries/tpcds-decimal_v2-q39-2.test      |  2 +-
 tests/infra/test_perf_infra.py                     | 44 ++++++++++++++++
 tests/infra/test_stress_infra.py                   | 10 ++--
 tests/performance/workload.py                      | 37 ++-----------
 tests/stress/concurrent_select.py                  |  9 ++--
 tests/util/parse_util.py                           |  6 ++-
 tests/util/test_file_parser.py                     | 60 ++++++++++++++++------
 8 files changed, 107 insertions(+), 63 deletions(-)

diff --git a/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-1.test b/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-1.test
index 29d712b..81e20c0 100644
--- a/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-1.test
+++ b/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-1.test
@@ -1,5 +1,5 @@
 ====
----- QUERY: TPCDS-Q39.1
+---- QUERY: TPCDS-Q39-1
 -- RESULT MISMATCH FROM ORIGINAL
 -- ADD ROUND()s TO 4th, 5th, 9th, 10th COLUMNS, TAKE ACTUAL RESULTS AS EXPECTED.
 with inv as
diff --git a/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-2.test b/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-2.test
index 503d89d..ce7beb7 100644
--- a/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-2.test
+++ b/testdata/workloads/tpcds/queries/tpcds-decimal_v2-q39-2.test
@@ -1,5 +1,5 @@
 ====
----- QUERY: TPCDS-Q39.2
+---- QUERY: TPCDS-Q39-2
 -- RESULT MISMATCH FROM ORIGINAL
 -- ADDED ROUND()s TO 4th, 5th, 9th, 10th COLUMNS, TAKE ACTUAL RESULTS AS EXPECTED.
 with inv as
diff --git a/tests/infra/test_perf_infra.py b/tests/infra/test_perf_infra.py
new file mode 100644
index 0000000..3ec4617
--- /dev/null
+++ b/tests/infra/test_perf_infra.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This module attempts to enforce infrastructural assumptions that bind test tools to
+# product or other constraints. We want to stop these assumptions from breaking at
+# pre-merge time, not later.
+
+import pytest
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.performance.workload import Workload
+from tests.util.parse_util import (
+    EXPECTED_TPCDS_QUERIES_COUNT, EXPECTED_TPCH_NESTED_QUERIES_COUNT,
+    EXPECTED_TPCH_QUERIES_COUNT)
+
+
+class TestPerfInfra(ImpalaTestSuite):
+
+  @pytest.mark.parametrize(
+      'count_map',
+      [('tpcds', EXPECTED_TPCDS_QUERIES_COUNT, []),
+       ('tpch_nested', EXPECTED_TPCH_NESTED_QUERIES_COUNT, []),
+       ('tpch', EXPECTED_TPCH_QUERIES_COUNT, []),
+       ("tpch", 1, ["TPCH-Q1"]),
+       ("tpch", 12, ["TPCH-Q1.*", "TPCH-Q4"])])
+  def test_run_workload_finds_queries(self, count_map):
+    "Test that the perf tests select the expected number of queries to run."
+    workload, num_expected, query_name_filters = count_map
+    w = Workload(workload, query_name_filters)
+    assert len(w._query_map) == num_expected
diff --git a/tests/infra/test_stress_infra.py b/tests/infra/test_stress_infra.py
index bdbcdc7..20dac52 100644
--- a/tests/infra/test_stress_infra.py
+++ b/tests/infra/test_stress_infra.py
@@ -28,8 +28,8 @@ from tests.common.skip import SkipIfBuildType
 from tests.comparison.cluster import MiniCluster
 from tests.util.parse_util import (
     EXPECTED_TPCDS_QUERIES_COUNT, EXPECTED_TPCH_NESTED_QUERIES_COUNT,
-    EXPECTED_TPCH_QUERIES_COUNT, match_memory_estimate)
-from tests.util.test_file_parser import load_tpc_queries
+    EXPECTED_TPCH_STRESS_QUERIES_COUNT, match_memory_estimate)
+from tests.stress.concurrent_select import load_tpc_queries
 from tests.util.filesystem_utils import IS_LOCAL
 
 
@@ -56,7 +56,7 @@ class TestStressInfra(ImpalaTestSuite):
       'count_map',
       [('tpcds', EXPECTED_TPCDS_QUERIES_COUNT),
        ('tpch_nested', EXPECTED_TPCH_NESTED_QUERIES_COUNT),
-       ('tpch', EXPECTED_TPCH_QUERIES_COUNT)])
+       ('tpch', EXPECTED_TPCH_STRESS_QUERIES_COUNT)])
   def test_stress_finds_workloads(self, count_map):
     """
     Test that the stress test will properly load TPC workloads.
@@ -64,8 +64,8 @@ class TestStressInfra(ImpalaTestSuite):
     workload, count = count_map
     queries = load_tpc_queries(workload)
     assert count == len(queries)
-    for name in queries:
-      assert name is not None
+    for query in queries:
+      assert query.name is not None
 
   @SkipIfBuildType.remote
   def tests_minicluster_obj(self):
diff --git a/tests/performance/workload.py b/tests/performance/workload.py
index 062e4be..e6d0a6e 100644
--- a/tests/performance/workload.py
+++ b/tests/performance/workload.py
@@ -21,7 +21,7 @@ import fnmatch
 import re
 
 from tests.performance.query import Query
-from tests.util.test_file_parser import parse_query_test_file
+from tests.util.test_file_parser import load_tpc_queries
 
 class Workload(object):
   """Represents a workload.
@@ -47,7 +47,8 @@ class Workload(object):
     self._query_map = dict()
     # Build the query name -> string mapping in the c'tor. We want to fail fast and early
     # if the user input is bad.
-    self._validate_and_load(query_name_filters)
+    self._query_map = load_tpc_queries(self._name, query_name_filters=query_name_filters)
+    assert len(self._query_map) > 0, "No matching queries found for %s" % self._name
 
   @property
   def name(self):
@@ -57,38 +58,6 @@ class Workload(object):
   def query_map(self):
     return self._query_map
 
-  def _validate_and_load(self, query_name_filters):
-    """Validates that the Workload is legal."""
-    query_name_filters = map(str.strip, query_name_filters) if query_name_filters else []
-    self._base_dir = os.path.join(Workload.WORKLOAD_DIR, self._name, 'queries')
-    # Check whether the workload name corresponds to an existing directory.
-    if not os.path.isdir(self._base_dir):
-      raise ValueError("Workload %s not found in %s" % (self._name, self._base_dir))
-    sections = list()
-    # Parse all queries files for the given workload.
-    for file_name in self._list_query_files():
-      sections.extend(parse_query_test_file(file_name))
-    # If the user has specified query names, check whether all the user specified queries
-    # exist in the query files.
-    all_query_names = [s['QUERY_NAME'] for s in sections if s['QUERY_NAME'].strip()]
-    regex = re.compile(r'|'.join(['^%s$' % n for n in query_name_filters]), re.I)
-    matched_query_names = filter(lambda x: re.match(regex, x), all_query_names)
-    assert len(matched_query_names) > 0, "No matching queries found for %s" % self._name
-    # Filter the sections based on the queries the user wants.
-    sections = filter(lambda x: x['QUERY_NAME'] in matched_query_names, sections)
-    # Add the filtered queries to the query map
-    for section in sections:
-      self._query_map[section['QUERY_NAME']] = section['QUERY']
-
-  def _list_query_files(self):
-    """Return a list of all the .test files that contain queries"""
-    query_files = list()
-    for root, dirs, file_names in os.walk(self._base_dir):
-      for file_name in fnmatch.filter(file_names, '*.test'):
-        query_files.append(os.path.join(root, file_name))
-    assert len(query_files) > 0, "No Query Files found in %s" % self._base_dir
-    return query_files
-
   def construct_queries(self, test_vector, scale_factor):
     """Transform a query map into a list of query objects.
 
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index babbd04..00004a3 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -87,7 +87,7 @@ from tests.comparison.query_generator import QueryGenerator
 from tests.comparison.query_profile import DefaultProfile
 from tests.util.parse_util import (
     EXPECTED_TPCDS_QUERIES_COUNT, EXPECTED_TPCH_NESTED_QUERIES_COUNT,
-    EXPECTED_TPCH_QUERIES_COUNT, match_memory_estimate, parse_mem_to_mb)
+    EXPECTED_TPCH_STRESS_QUERIES_COUNT, match_memory_estimate, parse_mem_to_mb)
 from tests.util.thrift_util import op_handle_to_query_id
 
 LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
@@ -1307,7 +1307,8 @@ def load_tpc_queries(workload):
   """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
   LOG.info("Loading %s queries", workload)
   queries = []
-  for query_name, query_sql in test_file_parser.load_tpc_queries(workload).iteritems():
+  for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
+      include_stress_queries=True).iteritems():
     query = Query()
     query.name = query_name
     query.sql = query_sql
@@ -2186,7 +2187,7 @@ def main():
         queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_db:
     tpch_queries = load_tpc_queries("tpch")
-    assert len(tpch_queries) == EXPECTED_TPCH_QUERIES_COUNT
+    assert len(tpch_queries) == EXPECTED_TPCH_STRESS_QUERIES_COUNT
     for query in tpch_queries:
       query.db_name = args.tpch_db
     queries.extend(tpch_queries)
@@ -2204,7 +2205,7 @@ def main():
         queries.extend(generate_compute_stats_queries(cursor))
   if args.tpch_kudu_db:
     tpch_kudu_queries = load_tpc_queries("tpch")
-    assert len(tpch_kudu_queries) == EXPECTED_TPCH_QUERIES_COUNT
+    assert len(tpch_kudu_queries) == EXPECTED_TPCH_STRESS_QUERIES_COUNT
     for query in tpch_kudu_queries:
       query.db_name = args.tpch_kudu_db
     queries.extend(tpch_kudu_queries)
diff --git a/tests/util/parse_util.py b/tests/util/parse_util.py
index 031cbdb..efe56b7 100644
--- a/tests/util/parse_util.py
+++ b/tests/util/parse_util.py
@@ -22,9 +22,11 @@ from datetime import datetime
 # changed, and the stress test loses the ability to run the full set of queries. Set
 # these constants and assert that when a workload is used, all the queries we expect to
 # use are there.
-EXPECTED_TPCDS_QUERIES_COUNT = 71
+EXPECTED_TPCDS_QUERIES_COUNT = 72
 EXPECTED_TPCH_NESTED_QUERIES_COUNT = 22
-EXPECTED_TPCH_QUERIES_COUNT = 25
+EXPECTED_TPCH_QUERIES_COUNT = 22
+# Add the number of stress test specific queries, i.e. in files like '*-stress-*.test'
+EXPECTED_TPCH_STRESS_QUERIES_COUNT = EXPECTED_TPCH_QUERIES_COUNT + 3
 # Regex to extract the estimated memory from an explain plan.
 # The unit prefixes can be found in
 # fe/src/main/java/org/apache/impala/common/PrintUtils.java
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index bb78932..eeab5ad 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -317,15 +317,18 @@ def write_test_file(test_file_name, test_file_sections, encoding=None):
     test_file.write(('\n').join(test_file_text))
 
 
-def load_tpc_queries(workload):
+def load_tpc_queries(workload, include_stress_queries=False, query_name_filters=[]):
   """
   Returns a list of queries for the given workload. 'workload' should either be 'tpch',
-  'tpcds', or 'tpch_nested'. Two types of queries are returned:
+  'tpcds', 'tpch_nested', or 'targeted-perf'. The types of queries that are returned:
   - 'standard' queries, i.e. from the spec for that workload. These queries will have
     filenames like '{workload}-q*.test' and one query per file.
-  - 'targeted' queries, which run against data from the workkload but were designed to
-    stress specific aspects of Impala. These queries have filenames like
-    '{workload}-stress-*.test' and may have multiple queries per file.
+  - 'stress' queries, if 'include_stress_queries' is true, which run against data from
+    the workkload but were designed to stress specific aspects of Impala. These queries
+    have filenames like '{workload}-stress-*.test' and may have multiple queries per file.
+  - 'targeted' queries, if the workload is 'targeted-perf', which have no restrictions.
+  The returned queries are filtered according to 'query_name_filters', a list of query
+  name regexes, if specified.
   All queries are required to have a name specified, i.e. each test case should have:
     ---- QUERY: WORKLOAD-<QUERY_NAME>
   """
@@ -333,29 +336,54 @@ def load_tpc_queries(workload):
   queries = dict()
   query_dir = os.path.join(
       os.environ['IMPALA_HOME'], "testdata", "workloads", workload, "queries")
+  # Check whether the workload name corresponds to an existing directory.
+  if not os.path.isdir(query_dir):
+    raise ValueError("Workload %s not found in %s" % (workload, query_dir))
+
   # IMPALA-6715 and others from the past: This pattern enforces the queries we actually
   # find. Both workload directories contain other queries that are not part of the TPC
   # spec.
-  file_name_pattern = re.compile(r"^{0}-(q.*|stress-.*).test$".format(workload))
+  file_workload = workload
+  if workload == "tpcds":
+    # TPCDS is assumed to always use decimal_v2, which is the default since 3.0
+    file_workload = "tpcds-decimal_v2"
+  if include_stress_queries:
+    file_name_pattern = re.compile(r"^{0}-(q.*|stress-.*).test$".format(file_workload))
+  else:
+    file_name_pattern = re.compile(r"^{0}-(q.*).test$".format(file_workload))
+
   query_name_pattern = re.compile(r"^{0}-(.*)$".format(workload.upper()))
   if workload == "tpch_nested":
     query_name_pattern = re.compile(r"^TPCH-(.*)$")
+
+  if workload == "targeted-perf":
+    # We don't enforce any restrictions on targeted-perf queries.
+    file_name_pattern = re.compile(r"(.*)")
+    query_name_pattern = re.compile(r"(.*)")
+
+  query_name_filters = map(str.strip, query_name_filters) if query_name_filters else []
+  filter_regex = re.compile(r'|'.join(['^%s$' % n for n in query_name_filters]), re.I)
+
   for query_file in os.listdir(query_dir):
-    match = file_name_pattern.search(query_file)
-    if not match:
+    is_standard = "stress" not in query_file and workload != "targeted-perf"
+    file_match = file_name_pattern.search(query_file)
+    if not file_match:
       continue
     file_path = os.path.join(query_dir, query_file)
     test_cases = parse_query_test_file(file_path)
     for test_case in test_cases:
       query_sql = remove_comments(test_case["QUERY"])
-      query_name = query_name_pattern.search(test_case["QUERY_NAME"]).group(1).lower()
-      # For standard queries, we require that the query name matches the file name.
-      if "stress" not in query_file: assert match.group(1) == query_name
-      queries[query_name] = query_sql
+
+      if re.match(filter_regex, test_case["QUERY_NAME"]):
+        query_name_match = query_name_pattern.search(test_case["QUERY_NAME"])
+        # For standard queries, we require that the query name matches the file name.
+        if is_standard and file_match.group(1).upper() != query_name_match.group(1):
+          raise Exception("Query name '%s' does not match file name '%s'"
+              % (query_name_match.group(1), file_match.group(1).upper()))
+        queries[query_name_match.group(0)] = query_sql
 
     # The standard tpc queries must have one query per file.
-    if "stress" not in query_file and len(test_cases) != 1:
-      raise Exception(
-          "Expected exactly 1 query to be in file %s but got %s"
-          % (file_path, len(file_queries)))
+    if is_standard and len(test_cases) != 1:
+      raise Exception("Expected exactly 1 query to be in file %s but got %s"
+          % (file_path, len(test_cases)))
   return queries