You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/03/31 23:35:05 UTC

[04/12] impala git commit: IMPALA-5721, IMPALA-6717, IMPALA-6738: improve stress test binary search

IMPALA-5721,IMPALA-6717,IMPALA-6738: improve stress test binary search

IMPALA-5721:
- Save profiles of queries at the end of both the spilling and
  non-spilling binary search. These were not being saved before. Note
  these profiles won't have ExecSummary until IMPALA-6640 is addressed.

- Save the profile of any query that produces incorrect results during
  binary search. These were not being saved before, either.

- Use descriptive names, like
  tpch_100_parquet_q12_profile_without_spilling.txt, for profiles
  mentioned above. We do this by introducing the concept of a
  "logical_query_id" whose values look like "tpch_100_parquet_q12".

- Use the logical_query_id in critical error paths and include the
  logical_query_id in result hash files.

IMPALA-6717:
- Plumb --common-query-options through to the binary search.

IMPALA-6738:
- Begin a refactoring to reduce the number of parameters used when doing
  the binary search.

- Introduce a notion of "converted args" via class that does the
  conversion (if needed) via property getters.

- Adjust populate_all_queries() to use converted_args

Change-Id: I33d036ec93df3016cd4703205078dbdba0168acb
Reviewed-on: http://gerrit.cloudera.org:8080/9770
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/382d7793
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/382d7793
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/382d7793

Branch: refs/heads/2.x
Commit: 382d7793955aa546082fdf0d7b28e176e22db99f
Parents: ee05cf5
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Feb 28 11:02:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:46:07 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 289 ++++++++++++++++++++++++---------
 1 file changed, 213 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/382d7793/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a4bffd9..44d7b34 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -62,17 +62,19 @@ import sys
 import threading
 import traceback
 from Queue import Empty   # Must be before Queue below
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
 from collections import defaultdict
 from contextlib import contextmanager
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
-from random import choice, random, randrange
+from random import choice, random, randrange, shuffle
 from sys import exit, maxint
 from tempfile import gettempdir
 from textwrap import dedent
 from threading import current_thread, Thread
 from time import sleep, time
 
+import tests.comparison.cli_options as cli_options
 import tests.util.test_file_parser as test_file_parser
 from tests.comparison.cluster import Timeout
 from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
@@ -110,6 +112,83 @@ RESULT_HASHES_DIR = "result_hashes"
 RUNTIME_INFO_FILE_VERSION = 3
 
 
+class StressArgConverter(object):
+  def __init__(self, args):
+    """
+    Convert arguments as returned from from argparse parse_args() into internal forms.
+
+    The purpose of this object is to do any conversions needed from the type given by
+    parge_args() into internal forms. For example, if a commandline option takes in a
+    complicated string that needs to be converted into a list or dictionary, this is the
+    place to do it. Access works the same as on the object returned by parse_args(),
+    i.e., object.option_attribute.
+
+    In most cases, simple arguments needn't be converted, because argparse handles the
+    type conversion already, and in most cases, type conversion (e.g., "8" <str> to 8
+    <int>) is all that's needed. If a property getter below doesn't exist, it means the
+    argument value is just passed along unconverted.
+
+    Params:
+      args: argparse.Namespace object (from argparse.ArgumentParser().parse_args())
+    """
+    assert isinstance(args, Namespace), "expected Namespace, got " + str(type(args))
+    self._args = args
+    self._common_query_options = None
+
+  def __getattr__(self, attr):
+    # This "proxies through" all the attributes from the Namespace object that are not
+    # defined in this object via property getters below.
+    return getattr(self._args, attr)
+
+  @property
+  def common_query_options(self):
+    # Memoize this, as the integrity checking of --common-query-options need only
+    # happen once.
+    if self._common_query_options is not None:
+      return self._common_query_options
+    # The stress test sets these, so callers cannot override them.
+    IGNORE_QUERY_OPTIONS = frozenset([
+        'ABORT_ON_ERROR',
+        'MEM_LIMIT',
+    ])
+    common_query_options = {}
+    if self._args.common_query_options is not None:
+      for query_option_and_value in self._args.common_query_options:
+        try:
+          query_option, value = query_option_and_value.split('=')
+        except ValueError:
+          LOG.error(
+              "Could not parse --common-query-options: '{common_query_options}'".format(
+                  common_query_options=self._args.common_query_options))
+          exit(1)
+        query_option = query_option.upper()
+        if query_option in common_query_options:
+          LOG.error(
+              "Query option '{query_option}' already defined in --common-query-options: "
+              "'{common_query_options}'".format(
+                  query_option=query_option,
+                  common_query_options=self._args.common_query_options))
+          exit(1)
+        elif query_option in IGNORE_QUERY_OPTIONS:
+          LOG.warn(
+              "Ignoring '{query_option}' in common query options: '{opt}': "
+              "The stress test algorithm needs control of this option.".format(
+                  query_option=query_option, opt=self._args.common_query_options))
+        else:
+          common_query_options[query_option] = value
+          LOG.debug("Common query option '{query_option}' set to '{value}'".format(
+              query_option=query_option, value=value))
+    self._common_query_options = common_query_options
+    return self._common_query_options
+
+  @property
+  def runtime_info_path(self):
+    runtime_info_path = self._args.runtime_info_path
+    if "{cm_host}" in runtime_info_path:
+      runtime_info_path = runtime_info_path.format(cm_host=self._args.cm_host)
+    return runtime_info_path
+
+
 def create_and_start_daemon_thread(fn, name):
   thread = Thread(target=fn, name=name)
   thread.error = None
@@ -169,7 +248,9 @@ def print_crash_info_if_exists(impala, start_time):
 class QueryReport(object):
   """Holds information about a single query run."""
 
-  def __init__(self):
+  def __init__(self, query):
+    self.query = query
+
     self.result_hash = None
     self.runtime_secs = None
     self.mem_was_spilled = False
@@ -180,6 +261,32 @@ class QueryReport(object):
     self.profile = None
     self.query_id = None
 
+  def write_query_profile(self, directory, prefix=None):
+    """
+    Write out the query profile bound to this object to a given directory.
+
+    The file name is generated and will contain the query ID. Use the optional prefix
+    parameter to set a prefix on the filename.
+
+    Example return:
+      tpcds_300_decimal_parquet_q21_00000001_a38c8331_profile.txt
+
+    Parameters:
+      directory (str): Directory to write profile.
+      prefix (str): Prefix for filename.
+    """
+    if not (self.profile and self.query_id):
+      return
+    if prefix is not None:
+      file_name = prefix + '_'
+    else:
+      file_name = ''
+    file_name += self.query.logical_query_id + '_'
+    file_name += self.query_id.replace(":", "_") + "_profile.txt"
+    profile_log_path = os.path.join(directory, file_name)
+    with open(profile_log_path, "w") as profile_log:
+      profile_log.write(self.profile)
+
 
 class MemBroker(object):
   """Provides memory usage coordination for clients running in different processes.
@@ -632,14 +739,18 @@ class StressRunner(object):
             continue
           increment(self._num_successive_errors)
           increment(self._num_other_errors)
-          self._write_query_profile(report)
-          raise Exception("Query {0} failed: {1}".format(report.query_id, error_msg))
+          self._write_query_profile(report, PROFILES_DIR, prefix='error')
+          raise Exception("Query {query} ID {id} failed: {mesg}".format(
+              query=query.logical_query_id,
+              id=report.query_id,
+              mesg=error_msg))
         if (
             report.mem_limit_exceeded and
             not self._mem_broker.was_overcommitted(reservation_id)
         ):
           increment(self._num_successive_errors)
-          self._write_query_profile(report)
+          self._write_query_profile(
+              report, PROFILES_DIR, prefix='unexpected_mem_exceeded')
           raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. "
                           "Query ID: {0}".format(report.query_id))
         if (
@@ -649,18 +760,19 @@ class StressRunner(object):
         ):
           increment(self._num_successive_errors)
           increment(self._num_result_mismatches)
-          self._write_query_profile(report)
+          self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results')
           raise Exception(dedent("""\
                                  Result hash mismatch; expected {expected}, got {actual}
                                  Query ID: {id}
                                  Query: {query}""".format(expected=query.result_hash,
                                                           actual=report.result_hash,
                                                           id=report.query_id,
-                                                          query=query.sql)))
+                                                          query=query.logical_query_id)))
         if report.timed_out and not should_cancel:
-          self._write_query_profile(report)
+          self._write_query_profile(report, PROFILES_DIR, prefix='timed_out')
           raise Exception(
-              "Query unexpectedly timed out. Query ID: {0}".format(report.query_id))
+              "Query {query} unexpectedly timed out. Query ID: {id}".format(
+                  query=query.logical_query_id, id=report.query_id))
         self._num_successive_errors.value = 0
 
   def _print_status_header(self):
@@ -706,13 +818,10 @@ class StressRunner(object):
     if report.timed_out:
       increment(self._num_queries_timedout)
 
-  def _write_query_profile(self, report):
-    if not (report.profile and report.query_id):
-      return
-    file_name = report.query_id.replace(":", "_") + "_profile.txt"
-    profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name)
-    with open(profile_log_path, "w") as profile_log:
-      profile_log.write(report.profile)
+  def _write_query_profile(self, report, subdir, prefix=None):
+    report.write_query_profile(
+        os.path.join(self.results_dir, subdir),
+        prefix)
 
   def _check_successive_errors(self):
     if (self._num_successive_errors.value >= self.num_successive_errors_needed_to_abort):
@@ -807,6 +916,8 @@ class Query(object):
     self.result_hash = None
     self.required_mem_mb_with_spilling = None
     self.required_mem_mb_without_spilling = None
+    self.solo_runtime_profile_with_spilling = None
+    self.solo_runtime_profile_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
     # Query options to set before running the query.
@@ -818,6 +929,8 @@ class Query(object):
     # UPSERT, DELETE.
     self.query_type = QueryType.SELECT
 
+    self._logical_query_id = None
+
   def __repr__(self):
     return dedent("""
         <Query
@@ -832,6 +945,29 @@ class Query(object):
         Population order: %(population_order)r>
         """.strip() % self.__dict__)
 
+  @property
+  def logical_query_id(self):
+    """
+    Return a meanginful unique str identifier for the query.
+
+    Example: "tpcds_300_decimal_parquet_q21"
+    """
+    if self._logical_query_id is None:
+      self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
+    return self._logical_query_id
+
+  def write_runtime_info_profiles(self, directory):
+    """Write profiles for spilling and non-spilling into directory (str)."""
+    profiles_to_write = [
+        (self.logical_query_id + "_profile_with_spilling.txt",
+         self.solo_runtime_profile_with_spilling),
+        (self.logical_query_id + "_profile_without_spilling.txt",
+         self.solo_runtime_profile_without_spilling),
+    ]
+    for filename, profile in profiles_to_write:
+      with open(os.path.join(directory, filename), "w") as fh:
+        fh.write(profile)
+
 
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
@@ -856,7 +992,7 @@ class QueryRunner(object):
       self.impalad_conn = None
 
   def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
-                should_cancel=False):
+                should_cancel=False, retain_profile=False):
     """Run a query and return an execution report. If 'run_set_up' is True, set up sql
     will be executed before the main query. This should be the case during the binary
     search phase of the stress test.
@@ -868,7 +1004,7 @@ class QueryRunner(object):
       raise Exception("connect() must first be called")
 
     timeout_unix_time = time() + timeout_secs
-    report = QueryReport()
+    report = QueryReport(query)
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
@@ -914,7 +1050,8 @@ class QueryRunner(object):
           if query.query_type == QueryType.SELECT:
             try:
               report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
-              if query.result_hash and report.result_hash != query.result_hash:
+              if retain_profile or \
+                 query.result_hash and report.result_hash != query.result_hash:
                 fetch_and_set_profile(cursor, report)
             except QueryTimeout:
               self._cancel(cursor, report)
@@ -1004,7 +1141,7 @@ class QueryRunner(object):
     def hash_result_impl():
       result_log = None
       try:
-        file_name = query_id.replace(":", "_")
+        file_name = '_'.join([query.logical_query_id, query_id.replace(":", "_")])
         if query.result_hash is None:
           file_name += "_initial"
         file_name += "_results.txt"
@@ -1159,7 +1296,8 @@ def populate_runtime_info_for_random_queries(
 
 def populate_runtime_info(
     query, impala, use_kerberos, results_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0
+    timeout_secs=maxint, samples=1, max_conflicting_samples=0,
+    common_query_options=None
 ):
   """Runs the given query by itself repeatedly until the minimum memory is determined
   with and without spilling. Potentially all fields in the Query class (except
@@ -1177,6 +1315,7 @@ def populate_runtime_info(
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
+  runner.common_query_options = common_query_options
   runner.impalad = impala.impalads[0]
   runner.results_dir = results_dir
   runner.use_kerberos = use_kerberos
@@ -1191,6 +1330,8 @@ def populate_runtime_info(
   old_required_mem_mb_without_spilling = query.required_mem_mb_without_spilling
   old_required_mem_mb_with_spilling = query.required_mem_mb_with_spilling
 
+  profile_error_prefix = query.logical_query_id + "_binsearch_error"
+
   # TODO: This method is complicated enough now that breaking it out into a class may be
   # helpful to understand the structure.
 
@@ -1203,30 +1344,45 @@ def populate_runtime_info(
       ):
         query.required_mem_mb_with_spilling = required_mem
         query.solo_runtime_secs_with_spilling = report.runtime_secs
+        query.solo_runtime_profile_with_spilling = report.profile
     elif (
         query.required_mem_mb_without_spilling is None or
         required_mem < query.required_mem_mb_without_spilling
     ):
       query.required_mem_mb_without_spilling = required_mem
       query.solo_runtime_secs_without_spilling = report.runtime_secs
+      query.solo_runtime_profile_without_spilling = report.profile
 
   def get_report(desired_outcome=None):
     reports_by_outcome = defaultdict(list)
     leading_outcome = None
     for remaining_samples in xrange(samples - 1, -1, -1):
-      report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
+      report = runner.run_query(query, timeout_secs, mem_limit,
+                                run_set_up=True, retain_profile=True)
       if report.timed_out:
-        raise QueryTimeout()
+        report.write_query_profile(
+            os.path.join(results_dir, PROFILES_DIR),
+            profile_error_prefix)
+        raise QueryTimeout(
+            "query {0} timed out during binary search".format(query.logical_query_id))
       if report.non_mem_limit_error:
-        raise report.non_mem_limit_error
+        report.write_query_profile(
+            os.path.join(results_dir, PROFILES_DIR),
+            profile_error_prefix)
+        raise Exception(
+            "query {0} errored during binary search: {1}".format(
+                query.logical_query_id, str(report.non_mem_limit_error)))
       LOG.debug("Spilled: %s" % report.mem_was_spilled)
       if not report.mem_limit_exceeded:
         if query.result_hash is None:
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
+          report.write_query_profile(
+              os.path.join(results_dir, PROFILES_DIR),
+              profile_error_prefix)
           raise Exception(
-              "Result hash mismatch; expected %s, got %s" %
-              (query.result_hash, report.result_hash))
+              "Result hash mismatch for query %s; expected %s, got %s" %
+              (query.logical_query_id, query.result_hash, report.result_hash))
 
       if report.mem_limit_exceeded:
         outcome = "EXCEEDED"
@@ -1332,8 +1488,16 @@ def populate_runtime_info(
       upper_bound = mem_limit
     if should_break:
       if not query.required_mem_mb_with_spilling:
+        if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB:
+          # IMPALA-6604: A fair amount of queries go down this path.
+          LOG.info(
+              "Unable to find a memory limit with spilling within the threshold of {0} "
+              "MB. Using the same memory limit for both.".format(
+                  MEM_LIMIT_EQ_THRESHOLD_MB))
         query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
         query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+        query.solo_runtime_profile_with_spilling = \
+            query.solo_runtime_profile_without_spilling
       break
   LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
   if (
@@ -1349,6 +1513,7 @@ def populate_runtime_info(
         " the absolute minimum memory.")
     query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
     query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+    query.solo_runtime_profile_with_spilling = query.solo_runtime_profile_without_spilling
   LOG.debug("Query after populating runtime info: %s", query)
 
 
@@ -1420,9 +1585,12 @@ def save_runtime_info(path, query, impala):
     class JsonEncoder(json.JSONEncoder):
       def default(self, obj):
         data = dict(obj.__dict__)
-        # Queries are stored by sql, so remove the duplicate data.
-        if "sql" in data:
-          del data["sql"]
+        # Queries are stored by sql, so remove the duplicate data. Also don't store
+        # profiles as JSON values, but instead separately.
+        for k in ("sql", "solo_runtime_profile_with_spilling",
+                  "solo_runtime_profile_without_spilling"):
+          if k in data:
+            del data[k]
         return data
     json.dump(
         store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
@@ -1690,9 +1858,12 @@ def reset_databases(cursor):
                   " exist in '{1}' database.".format(table_name, cursor.db_name))
 
 
-def populate_all_queries(queries, impala, args, runtime_info_path,
-                         queries_with_runtime_info_by_db_sql_and_options):
+def populate_all_queries(
+    queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options
+):
   """Populate runtime info for all queries, ordered by the population_order property."""
+  common_query_options = converted_args.common_query_options
+  runtime_info_path = converted_args.runtime_info_path
   result = []
   queries_by_order = {}
   for query in queries:
@@ -1712,9 +1883,13 @@ def populate_all_queries(queries, impala, args, runtime_info_path,
             query.db_name][query.sql][str(sorted(query.options.items()))])
       else:
         populate_runtime_info(
-            query, impala, args.use_kerberos, args.results_dir,
-            samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+            query, impala, converted_args.use_kerberos, converted_args.results_dir,
+            samples=converted_args.samples,
+            max_conflicting_samples=converted_args.max_conflicting_samples,
+            common_query_options=common_query_options)
         save_runtime_info(runtime_info_path, query, impala)
+        query.write_runtime_info_profiles(
+            os.path.join(converted_args.results_dir, PROFILES_DIR))
         result.append(query)
   return result
 
@@ -1745,10 +1920,6 @@ def print_version(cluster):
 
 
 def main():
-  from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
-  from random import shuffle
-  import tests.comparison.cli_options as cli_options
-
   parser = ArgumentParser(
       epilog=dedent("""
       Before running this script a CM cluster must be setup and any needed data
@@ -1891,6 +2062,9 @@ def main():
       "or set valid values. Example: --common-query-options "
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
+  converted_args = StressArgConverter(args)
+  common_query_options = converted_args.common_query_options
+  runtime_info_path = converted_args.runtime_info_path
 
   cli_options.configure_logging(
       args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True,
@@ -1906,40 +2080,6 @@ def main():
         "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
         "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
 
-  # The stress test sets these, so callers cannot override them.
-  IGNORE_QUERY_OPTIONS = frozenset([
-      'ABORT_ON_ERROR',
-      'MEM_LIMIT',
-  ])
-
-  common_query_options = {}
-  if args.common_query_options is not None:
-    for query_option_and_value in args.common_query_options:
-      try:
-        query_option, value = query_option_and_value.split('=')
-      except ValueError:
-        LOG.error(
-            "Could not parse --common-query-options: '{common_query_options}'".format(
-                common_query_options=args.common_query_options))
-        exit(1)
-      query_option = query_option.upper()
-      if query_option in common_query_options:
-        LOG.error(
-            "Query option '{query_option}' already defined in --common-query-options: "
-            "'{common_query_options}'".format(
-                query_option=query_option,
-                common_query_options=args.common_query_options))
-        exit(1)
-      elif query_option in IGNORE_QUERY_OPTIONS:
-        LOG.warn(
-            "Ignoring '{query_option}' in common query options: '{opt}': "
-            "The stress test algorithm needs control of this option.".format(
-                query_option=query_option, opt=args.common_query_options))
-      else:
-        common_query_options[query_option] = value
-        LOG.debug("Common query option '{query_option}' set to '{value}'".format(
-            query_option=query_option, value=value))
-
   os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
   os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
 
@@ -1956,9 +2096,6 @@ def main():
     raise Exception("Queries are currently running on the cluster")
   impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit())
 
-  runtime_info_path = args.runtime_info_path
-  if "{cm_host}" in runtime_info_path:
-    runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
   queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
       runtime_info_path, impala)
 
@@ -2026,8 +2163,8 @@ def main():
       with impala.cursor(db_name=database) as cursor:
         reset_databases(cursor)
 
-  queries = populate_all_queries(queries, impala, args, runtime_info_path,
-                                 queries_with_runtime_info_by_db_sql_and_options)
+  queries = populate_all_queries(
+      queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options)
 
   # A particular random query may either fail (due to a generator or Impala bug) or
   # take a really long time to complete. So the queries needs to be validated. Since the