You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/03/31 23:35:05 UTC
[04/12] impala git commit: IMPALA-5721, IMPALA-6717,
IMPALA-6738: improve stress test binary search
IMPALA-5721,IMPALA-6717,IMPALA-6738: improve stress test binary search
IMPALA-5721:
- Save profiles of queries at the end of both the spilling and
non-spilling binary search. These were not being saved before. Note
these profiles won't have ExecSummary until IMPALA-6640 is addressed.
- Save the profile of any query that produces incorrect results during
binary search. These were not being saved before, either.
- Use descriptive names, like
tpch_100_parquet_q12_profile_without_spilling.txt, for profiles
mentioned above. We do this by introducing the concept of a
"logical_query_id" whose values look like "tpch_100_parquet_q12".
- Use the logical_query_id in critical error paths and include the
logical_query_id in result hash files.
IMPALA-6717:
- Plumb --common-query-options through to the binary search.
IMPALA-6738:
- Begin a refactoring to reduce the number of parameters used when doing
the binary search.
- Introduce a notion of "converted args" via class that does the
conversion (if needed) via property getters.
- Adjust populate_all_queries() to use converted_args
Change-Id: I33d036ec93df3016cd4703205078dbdba0168acb
Reviewed-on: http://gerrit.cloudera.org:8080/9770
Reviewed-by: David Knupp <dk...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/382d7793
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/382d7793
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/382d7793
Branch: refs/heads/2.x
Commit: 382d7793955aa546082fdf0d7b28e176e22db99f
Parents: ee05cf5
Author: Michael Brown <mi...@cloudera.com>
Authored: Wed Feb 28 11:02:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 29 03:46:07 2018 +0000
----------------------------------------------------------------------
tests/stress/concurrent_select.py | 289 ++++++++++++++++++++++++---------
1 file changed, 213 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/382d7793/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index a4bffd9..44d7b34 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -62,17 +62,19 @@ import sys
import threading
import traceback
from Queue import Empty # Must be before Queue below
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from collections import defaultdict
from contextlib import contextmanager
from datetime import datetime
from multiprocessing import Lock, Process, Queue, Value
-from random import choice, random, randrange
+from random import choice, random, randrange, shuffle
from sys import exit, maxint
from tempfile import gettempdir
from textwrap import dedent
from threading import current_thread, Thread
from time import sleep, time
+import tests.comparison.cli_options as cli_options
import tests.util.test_file_parser as test_file_parser
from tests.comparison.cluster import Timeout
from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
@@ -110,6 +112,83 @@ RESULT_HASHES_DIR = "result_hashes"
RUNTIME_INFO_FILE_VERSION = 3
+class StressArgConverter(object):
+ def __init__(self, args):
+ """
+ Convert arguments as returned from from argparse parse_args() into internal forms.
+
+ The purpose of this object is to do any conversions needed from the type given by
+ parge_args() into internal forms. For example, if a commandline option takes in a
+ complicated string that needs to be converted into a list or dictionary, this is the
+ place to do it. Access works the same as on the object returned by parse_args(),
+ i.e., object.option_attribute.
+
+ In most cases, simple arguments needn't be converted, because argparse handles the
+ type conversion already, and in most cases, type conversion (e.g., "8" <str> to 8
+ <int>) is all that's needed. If a property getter below doesn't exist, it means the
+ argument value is just passed along unconverted.
+
+ Params:
+ args: argparse.Namespace object (from argparse.ArgumentParser().parse_args())
+ """
+ assert isinstance(args, Namespace), "expected Namespace, got " + str(type(args))
+ self._args = args
+ self._common_query_options = None
+
+ def __getattr__(self, attr):
+ # This "proxies through" all the attributes from the Namespace object that are not
+ # defined in this object via property getters below.
+ return getattr(self._args, attr)
+
+ @property
+ def common_query_options(self):
+ # Memoize this, as the integrity checking of --common-query-options need only
+ # happen once.
+ if self._common_query_options is not None:
+ return self._common_query_options
+ # The stress test sets these, so callers cannot override them.
+ IGNORE_QUERY_OPTIONS = frozenset([
+ 'ABORT_ON_ERROR',
+ 'MEM_LIMIT',
+ ])
+ common_query_options = {}
+ if self._args.common_query_options is not None:
+ for query_option_and_value in self._args.common_query_options:
+ try:
+ query_option, value = query_option_and_value.split('=')
+ except ValueError:
+ LOG.error(
+ "Could not parse --common-query-options: '{common_query_options}'".format(
+ common_query_options=self._args.common_query_options))
+ exit(1)
+ query_option = query_option.upper()
+ if query_option in common_query_options:
+ LOG.error(
+ "Query option '{query_option}' already defined in --common-query-options: "
+ "'{common_query_options}'".format(
+ query_option=query_option,
+ common_query_options=self._args.common_query_options))
+ exit(1)
+ elif query_option in IGNORE_QUERY_OPTIONS:
+ LOG.warn(
+ "Ignoring '{query_option}' in common query options: '{opt}': "
+ "The stress test algorithm needs control of this option.".format(
+ query_option=query_option, opt=self._args.common_query_options))
+ else:
+ common_query_options[query_option] = value
+ LOG.debug("Common query option '{query_option}' set to '{value}'".format(
+ query_option=query_option, value=value))
+ self._common_query_options = common_query_options
+ return self._common_query_options
+
+ @property
+ def runtime_info_path(self):
+ runtime_info_path = self._args.runtime_info_path
+ if "{cm_host}" in runtime_info_path:
+ runtime_info_path = runtime_info_path.format(cm_host=self._args.cm_host)
+ return runtime_info_path
+
+
def create_and_start_daemon_thread(fn, name):
thread = Thread(target=fn, name=name)
thread.error = None
@@ -169,7 +248,9 @@ def print_crash_info_if_exists(impala, start_time):
class QueryReport(object):
"""Holds information about a single query run."""
- def __init__(self):
+ def __init__(self, query):
+ self.query = query
+
self.result_hash = None
self.runtime_secs = None
self.mem_was_spilled = False
@@ -180,6 +261,32 @@ class QueryReport(object):
self.profile = None
self.query_id = None
+ def write_query_profile(self, directory, prefix=None):
+ """
+ Write out the query profile bound to this object to a given directory.
+
+ The file name is generated and will contain the query ID. Use the optional prefix
+ parameter to set a prefix on the filename.
+
+ Example return:
+ tpcds_300_decimal_parquet_q21_00000001_a38c8331_profile.txt
+
+ Parameters:
+ directory (str): Directory to write profile.
+ prefix (str): Prefix for filename.
+ """
+ if not (self.profile and self.query_id):
+ return
+ if prefix is not None:
+ file_name = prefix + '_'
+ else:
+ file_name = ''
+ file_name += self.query.logical_query_id + '_'
+ file_name += self.query_id.replace(":", "_") + "_profile.txt"
+ profile_log_path = os.path.join(directory, file_name)
+ with open(profile_log_path, "w") as profile_log:
+ profile_log.write(self.profile)
+
class MemBroker(object):
"""Provides memory usage coordination for clients running in different processes.
@@ -632,14 +739,18 @@ class StressRunner(object):
continue
increment(self._num_successive_errors)
increment(self._num_other_errors)
- self._write_query_profile(report)
- raise Exception("Query {0} failed: {1}".format(report.query_id, error_msg))
+ self._write_query_profile(report, PROFILES_DIR, prefix='error')
+ raise Exception("Query {query} ID {id} failed: {mesg}".format(
+ query=query.logical_query_id,
+ id=report.query_id,
+ mesg=error_msg))
if (
report.mem_limit_exceeded and
not self._mem_broker.was_overcommitted(reservation_id)
):
increment(self._num_successive_errors)
- self._write_query_profile(report)
+ self._write_query_profile(
+ report, PROFILES_DIR, prefix='unexpected_mem_exceeded')
raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. "
"Query ID: {0}".format(report.query_id))
if (
@@ -649,18 +760,19 @@ class StressRunner(object):
):
increment(self._num_successive_errors)
increment(self._num_result_mismatches)
- self._write_query_profile(report)
+ self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results')
raise Exception(dedent("""\
Result hash mismatch; expected {expected}, got {actual}
Query ID: {id}
Query: {query}""".format(expected=query.result_hash,
actual=report.result_hash,
id=report.query_id,
- query=query.sql)))
+ query=query.logical_query_id)))
if report.timed_out and not should_cancel:
- self._write_query_profile(report)
+ self._write_query_profile(report, PROFILES_DIR, prefix='timed_out')
raise Exception(
- "Query unexpectedly timed out. Query ID: {0}".format(report.query_id))
+ "Query {query} unexpectedly timed out. Query ID: {id}".format(
+ query=query.logical_query_id, id=report.query_id))
self._num_successive_errors.value = 0
def _print_status_header(self):
@@ -706,13 +818,10 @@ class StressRunner(object):
if report.timed_out:
increment(self._num_queries_timedout)
- def _write_query_profile(self, report):
- if not (report.profile and report.query_id):
- return
- file_name = report.query_id.replace(":", "_") + "_profile.txt"
- profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name)
- with open(profile_log_path, "w") as profile_log:
- profile_log.write(report.profile)
+ def _write_query_profile(self, report, subdir, prefix=None):
+ report.write_query_profile(
+ os.path.join(self.results_dir, subdir),
+ prefix)
def _check_successive_errors(self):
if (self._num_successive_errors.value >= self.num_successive_errors_needed_to_abort):
@@ -807,6 +916,8 @@ class Query(object):
self.result_hash = None
self.required_mem_mb_with_spilling = None
self.required_mem_mb_without_spilling = None
+ self.solo_runtime_profile_with_spilling = None
+ self.solo_runtime_profile_without_spilling = None
self.solo_runtime_secs_with_spilling = None
self.solo_runtime_secs_without_spilling = None
# Query options to set before running the query.
@@ -818,6 +929,8 @@ class Query(object):
# UPSERT, DELETE.
self.query_type = QueryType.SELECT
+ self._logical_query_id = None
+
def __repr__(self):
return dedent("""
<Query
@@ -832,6 +945,29 @@ class Query(object):
Population order: %(population_order)r>
""".strip() % self.__dict__)
+ @property
+ def logical_query_id(self):
+ """
+ Return a meanginful unique str identifier for the query.
+
+ Example: "tpcds_300_decimal_parquet_q21"
+ """
+ if self._logical_query_id is None:
+ self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
+ return self._logical_query_id
+
+ def write_runtime_info_profiles(self, directory):
+ """Write profiles for spilling and non-spilling into directory (str)."""
+ profiles_to_write = [
+ (self.logical_query_id + "_profile_with_spilling.txt",
+ self.solo_runtime_profile_with_spilling),
+ (self.logical_query_id + "_profile_without_spilling.txt",
+ self.solo_runtime_profile_without_spilling),
+ ]
+ for filename, profile in profiles_to_write:
+ with open(os.path.join(directory, filename), "w") as fh:
+ fh.write(profile)
+
class QueryRunner(object):
"""Encapsulates functionality to run a query and provide a runtime report."""
@@ -856,7 +992,7 @@ class QueryRunner(object):
self.impalad_conn = None
def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
- should_cancel=False):
+ should_cancel=False, retain_profile=False):
"""Run a query and return an execution report. If 'run_set_up' is True, set up sql
will be executed before the main query. This should be the case during the binary
search phase of the stress test.
@@ -868,7 +1004,7 @@ class QueryRunner(object):
raise Exception("connect() must first be called")
timeout_unix_time = time() + timeout_secs
- report = QueryReport()
+ report = QueryReport(query)
try:
with self.impalad_conn.cursor() as cursor:
start_time = time()
@@ -914,7 +1050,8 @@ class QueryRunner(object):
if query.query_type == QueryType.SELECT:
try:
report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
- if query.result_hash and report.result_hash != query.result_hash:
+ if retain_profile or \
+ query.result_hash and report.result_hash != query.result_hash:
fetch_and_set_profile(cursor, report)
except QueryTimeout:
self._cancel(cursor, report)
@@ -1004,7 +1141,7 @@ class QueryRunner(object):
def hash_result_impl():
result_log = None
try:
- file_name = query_id.replace(":", "_")
+ file_name = '_'.join([query.logical_query_id, query_id.replace(":", "_")])
if query.result_hash is None:
file_name += "_initial"
file_name += "_results.txt"
@@ -1159,7 +1296,8 @@ def populate_runtime_info_for_random_queries(
def populate_runtime_info(
query, impala, use_kerberos, results_dir,
- timeout_secs=maxint, samples=1, max_conflicting_samples=0
+ timeout_secs=maxint, samples=1, max_conflicting_samples=0,
+ common_query_options=None
):
"""Runs the given query by itself repeatedly until the minimum memory is determined
with and without spilling. Potentially all fields in the Query class (except
@@ -1177,6 +1315,7 @@ def populate_runtime_info(
LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
runner = QueryRunner()
runner.check_if_mem_was_spilled = True
+ runner.common_query_options = common_query_options
runner.impalad = impala.impalads[0]
runner.results_dir = results_dir
runner.use_kerberos = use_kerberos
@@ -1191,6 +1330,8 @@ def populate_runtime_info(
old_required_mem_mb_without_spilling = query.required_mem_mb_without_spilling
old_required_mem_mb_with_spilling = query.required_mem_mb_with_spilling
+ profile_error_prefix = query.logical_query_id + "_binsearch_error"
+
# TODO: This method is complicated enough now that breaking it out into a class may be
# helpful to understand the structure.
@@ -1203,30 +1344,45 @@ def populate_runtime_info(
):
query.required_mem_mb_with_spilling = required_mem
query.solo_runtime_secs_with_spilling = report.runtime_secs
+ query.solo_runtime_profile_with_spilling = report.profile
elif (
query.required_mem_mb_without_spilling is None or
required_mem < query.required_mem_mb_without_spilling
):
query.required_mem_mb_without_spilling = required_mem
query.solo_runtime_secs_without_spilling = report.runtime_secs
+ query.solo_runtime_profile_without_spilling = report.profile
def get_report(desired_outcome=None):
reports_by_outcome = defaultdict(list)
leading_outcome = None
for remaining_samples in xrange(samples - 1, -1, -1):
- report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True)
+ report = runner.run_query(query, timeout_secs, mem_limit,
+ run_set_up=True, retain_profile=True)
if report.timed_out:
- raise QueryTimeout()
+ report.write_query_profile(
+ os.path.join(results_dir, PROFILES_DIR),
+ profile_error_prefix)
+ raise QueryTimeout(
+ "query {0} timed out during binary search".format(query.logical_query_id))
if report.non_mem_limit_error:
- raise report.non_mem_limit_error
+ report.write_query_profile(
+ os.path.join(results_dir, PROFILES_DIR),
+ profile_error_prefix)
+ raise Exception(
+ "query {0} errored during binary search: {1}".format(
+ query.logical_query_id, str(report.non_mem_limit_error)))
LOG.debug("Spilled: %s" % report.mem_was_spilled)
if not report.mem_limit_exceeded:
if query.result_hash is None:
query.result_hash = report.result_hash
elif query.result_hash != report.result_hash:
+ report.write_query_profile(
+ os.path.join(results_dir, PROFILES_DIR),
+ profile_error_prefix)
raise Exception(
- "Result hash mismatch; expected %s, got %s" %
- (query.result_hash, report.result_hash))
+ "Result hash mismatch for query %s; expected %s, got %s" %
+ (query.logical_query_id, query.result_hash, report.result_hash))
if report.mem_limit_exceeded:
outcome = "EXCEEDED"
@@ -1332,8 +1488,16 @@ def populate_runtime_info(
upper_bound = mem_limit
if should_break:
if not query.required_mem_mb_with_spilling:
+ if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB:
+ # IMPALA-6604: A fair amount of queries go down this path.
+ LOG.info(
+ "Unable to find a memory limit with spilling within the threshold of {0} "
+ "MB. Using the same memory limit for both.".format(
+ MEM_LIMIT_EQ_THRESHOLD_MB))
query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+ query.solo_runtime_profile_with_spilling = \
+ query.solo_runtime_profile_without_spilling
break
LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling)
if (
@@ -1349,6 +1513,7 @@ def populate_runtime_info(
" the absolute minimum memory.")
query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
+ query.solo_runtime_profile_with_spilling = query.solo_runtime_profile_without_spilling
LOG.debug("Query after populating runtime info: %s", query)
@@ -1420,9 +1585,12 @@ def save_runtime_info(path, query, impala):
class JsonEncoder(json.JSONEncoder):
def default(self, obj):
data = dict(obj.__dict__)
- # Queries are stored by sql, so remove the duplicate data.
- if "sql" in data:
- del data["sql"]
+ # Queries are stored by sql, so remove the duplicate data. Also don't store
+ # profiles as JSON values, but instead separately.
+ for k in ("sql", "solo_runtime_profile_with_spilling",
+ "solo_runtime_profile_without_spilling"):
+ if k in data:
+ del data[k]
return data
json.dump(
store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
@@ -1690,9 +1858,12 @@ def reset_databases(cursor):
" exist in '{1}' database.".format(table_name, cursor.db_name))
-def populate_all_queries(queries, impala, args, runtime_info_path,
- queries_with_runtime_info_by_db_sql_and_options):
+def populate_all_queries(
+ queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options
+):
"""Populate runtime info for all queries, ordered by the population_order property."""
+ common_query_options = converted_args.common_query_options
+ runtime_info_path = converted_args.runtime_info_path
result = []
queries_by_order = {}
for query in queries:
@@ -1712,9 +1883,13 @@ def populate_all_queries(queries, impala, args, runtime_info_path,
query.db_name][query.sql][str(sorted(query.options.items()))])
else:
populate_runtime_info(
- query, impala, args.use_kerberos, args.results_dir,
- samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
+ query, impala, converted_args.use_kerberos, converted_args.results_dir,
+ samples=converted_args.samples,
+ max_conflicting_samples=converted_args.max_conflicting_samples,
+ common_query_options=common_query_options)
save_runtime_info(runtime_info_path, query, impala)
+ query.write_runtime_info_profiles(
+ os.path.join(converted_args.results_dir, PROFILES_DIR))
result.append(query)
return result
@@ -1745,10 +1920,6 @@ def print_version(cluster):
def main():
- from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
- from random import shuffle
- import tests.comparison.cli_options as cli_options
-
parser = ArgumentParser(
epilog=dedent("""
Before running this script a CM cluster must be setup and any needed data
@@ -1891,6 +2062,9 @@ def main():
"or set valid values. Example: --common-query-options "
"DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
args = parser.parse_args()
+ converted_args = StressArgConverter(args)
+ common_query_options = converted_args.common_query_options
+ runtime_info_path = converted_args.runtime_info_path
cli_options.configure_logging(
args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True,
@@ -1906,40 +2080,6 @@ def main():
"At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
"--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
- # The stress test sets these, so callers cannot override them.
- IGNORE_QUERY_OPTIONS = frozenset([
- 'ABORT_ON_ERROR',
- 'MEM_LIMIT',
- ])
-
- common_query_options = {}
- if args.common_query_options is not None:
- for query_option_and_value in args.common_query_options:
- try:
- query_option, value = query_option_and_value.split('=')
- except ValueError:
- LOG.error(
- "Could not parse --common-query-options: '{common_query_options}'".format(
- common_query_options=args.common_query_options))
- exit(1)
- query_option = query_option.upper()
- if query_option in common_query_options:
- LOG.error(
- "Query option '{query_option}' already defined in --common-query-options: "
- "'{common_query_options}'".format(
- query_option=query_option,
- common_query_options=args.common_query_options))
- exit(1)
- elif query_option in IGNORE_QUERY_OPTIONS:
- LOG.warn(
- "Ignoring '{query_option}' in common query options: '{opt}': "
- "The stress test algorithm needs control of this option.".format(
- query_option=query_option, opt=args.common_query_options))
- else:
- common_query_options[query_option] = value
- LOG.debug("Common query option '{query_option}' set to '{value}'".format(
- query_option=query_option, value=value))
-
os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
@@ -1956,9 +2096,6 @@ def main():
raise Exception("Queries are currently running on the cluster")
impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit())
- runtime_info_path = args.runtime_info_path
- if "{cm_host}" in runtime_info_path:
- runtime_info_path = runtime_info_path.format(cm_host=args.cm_host)
queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
runtime_info_path, impala)
@@ -2026,8 +2163,8 @@ def main():
with impala.cursor(db_name=database) as cursor:
reset_databases(cursor)
- queries = populate_all_queries(queries, impala, args, runtime_info_path,
- queries_with_runtime_info_by_db_sql_and_options)
+ queries = populate_all_queries(
+ queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options)
# A particular random query may either fail (due to a generator or Impala bug) or
# take a really long time to complete. So the queries needs to be validated. Since the