You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/24 04:05:14 UTC

impala git commit: IMPALA-6904: stress test threshold parameters

Repository: impala
Updated Branches:
  refs/heads/master c1be4e967 -> 1dcc6c1be


IMPALA-6904: stress test threshold parameters

I needed this to generate numbers in test_mem_usage_scaling.py without
a +-50MB error in them.

Testing:
Ran a local stress test with TPC-H and random queries.

Change-Id: I46cc95cbb078c5ef9886971ab1c0f493ddcf8377
Reviewed-on: http://gerrit.cloudera.org:8080/9769
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1dcc6c1b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1dcc6c1b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1dcc6c1b

Branch: refs/heads/master
Commit: 1dcc6c1beb9c36528184aa5a90f5d3f258e32971
Parents: c1be4e9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Mar 22 15:11:12 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Apr 24 02:46:08 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 106 +++++++++++++++------------------
 1 file changed, 49 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1dcc6c1b/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 44d7b34..4316672 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -94,11 +94,6 @@ EXPECTED_TPCDS_QUERIES_COUNT = 71
 EXPECTED_TPCH_NESTED_QUERIES_COUNT = 22
 EXPECTED_TPCH_QUERIES_COUNT = 22
 
-# Used to short circuit a binary search of the min mem limit. Values will be considered
-# equal if they are within this ratio or absolute amount of each other.
-MEM_LIMIT_EQ_THRESHOLD_PC = 0.975
-MEM_LIMIT_EQ_THRESHOLD_MB = 50
-
 # Regex to extract the estimated memory from an explain plan.
 # The unit prefixes can be found in
 # fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -1245,8 +1240,7 @@ def load_queries_from_test_file(file_path, db_name=None):
 
 
 def load_random_queries_and_populate_runtime_info(
-    query_generator, model_translator, tables, db_name, impala, use_kerberos, query_count,
-    query_timeout_secs, results_dir
+    query_generator, model_translator, tables, impala, converted_args
 ):
   """Returns a list of random queries. Each query will also have its runtime info
   populated. The runtime info population also serves to validate the query.
@@ -1259,16 +1253,13 @@ def load_random_queries_and_populate_runtime_info(
       sql = model_translator.write_query(query_model)
       query = Query()
       query.sql = sql
-      query.db_name = db_name
+      query.db_name = converted_args.random_db
       yield query
   return populate_runtime_info_for_random_queries(
-      impala, use_kerberos, generate_candidates(), query_count, query_timeout_secs,
-      results_dir)
+      impala, generate_candidates(), converted_args)
 
 
-def populate_runtime_info_for_random_queries(
-    impala, use_kerberos, candidate_queries, query_count, query_timeout_secs, results_dir
-):
+def populate_runtime_info_for_random_queries(impala, candidate_queries, converted_args):
   """Returns a list of random queries. Each query will also have its runtime info
   populated. The runtime info population also serves to validate the query.
   """
@@ -1279,7 +1270,8 @@ def populate_runtime_info_for_random_queries(
   for query in candidate_queries:
     try:
       populate_runtime_info(
-          query, impala, use_kerberos, results_dir, timeout_secs=query_timeout_secs)
+          query, impala, converted_args,
+          timeout_secs=converted_args.random_query_timeout_seconds)
       queries.append(query)
     except Exception as e:
       # Ignore any non-fatal errors. These could be query timeouts or bad queries (
@@ -1289,36 +1281,37 @@ def populate_runtime_info_for_random_queries(
       LOG.warn(
           "Error running query (the test will continue)\n%s\n%s",
           e, query.sql, exc_info=True)
-    if len(queries) == query_count:
+    if len(queries) == converted_args.random_query_count:
       break
   return queries
 
 
-def populate_runtime_info(
-    query, impala, use_kerberos, results_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0,
-    common_query_options=None
-):
+def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
   """Runs the given query by itself repeatedly until the minimum memory is determined
   with and without spilling. Potentially all fields in the Query class (except
   'sql') will be populated by this method. 'required_mem_mb_without_spilling' and
   the corresponding runtime field may still be None if the query could not be run
   without spilling.
 
-  'samples' and 'max_conflicting_samples' control the reliability of the collected
-  information. The problem is that memory spilling or usage may differ (by a large
-  amount) from run to run due to races during execution. The parameters provide a way
-  to express "X out of Y runs must have resulted in the same outcome". Increasing the
-  number of samples and decreasing the tolerance (max conflicts) increases confidence
-  but also increases the time to collect the data.
+  converted_args.samples and converted_args.max_conflicting_samples control the
+  reliability of the collected information. The problem is that memory spilling or usage
+  may differ (by a large amount) from run to run due to races during execution. The
+  parameters provide a way to express "X out of Y runs must have resulted in the same
+  outcome". Increasing the number of samples and decreasing the tolerance (max conflicts)
+  increases confidence but also increases the time to collect the data.
   """
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
+  samples = converted_args.samples
+  max_conflicting_samples = converted_args.max_conflicting_samples
+  results_dir = converted_args.results_dir
+  mem_limit_eq_threshold_mb = converted_args.mem_limit_eq_threshold_mb
+  mem_limit_eq_threshold_percent = converted_args.mem_limit_eq_threshold_percent
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
-  runner.common_query_options = common_query_options
+  runner.common_query_options = converted_args.common_query_options
   runner.impalad = impala.impalads[0]
   runner.results_dir = results_dir
-  runner.use_kerberos = use_kerberos
+  runner.use_kerberos = converted_args.use_kerberos
   runner.connect()
   limit_exceeded_mem = 0
   non_spill_mem = None
@@ -1361,14 +1354,12 @@ def populate_runtime_info(
                                 run_set_up=True, retain_profile=True)
       if report.timed_out:
         report.write_query_profile(
-            os.path.join(results_dir, PROFILES_DIR),
-            profile_error_prefix)
+            os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
         raise QueryTimeout(
             "query {0} timed out during binary search".format(query.logical_query_id))
       if report.non_mem_limit_error:
         report.write_query_profile(
-            os.path.join(results_dir, PROFILES_DIR),
-            profile_error_prefix)
+            os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
         raise Exception(
             "query {0} errored during binary search: {1}".format(
                 query.logical_query_id, str(report.non_mem_limit_error)))
@@ -1378,8 +1369,7 @@ def populate_runtime_info(
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
           report.write_query_profile(
-              os.path.join(results_dir, PROFILES_DIR),
-              profile_error_prefix)
+              os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
           raise Exception(
               "Result hash mismatch for query %s; expected %s, got %s" %
               (query.logical_query_id, query.result_hash, report.result_hash))
@@ -1416,6 +1406,7 @@ def populate_runtime_info(
     LOG.info("Finding a starting point for binary search")
     mem_limit = min(mem_estimate, impala.min_impalad_mem_mb) or impala.min_impalad_mem_mb
     while True:
+      LOG.info("Next mem_limit: {0}".format(mem_limit))
       report = get_report()
       if not report or report.mem_limit_exceeded:
         if report and report.mem_limit_exceeded:
@@ -1442,8 +1433,9 @@ def populate_runtime_info(
       old_required_mem_mb_without_spilling = None
     else:
       mem_limit = (lower_bound + upper_bound) / 2
-    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC or \
-        upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+    LOG.info("Next mem_limit: {0}".format(mem_limit))
+    should_break = mem_limit / float(upper_bound) > 1 - mem_limit_eq_threshold_percent \
+        or upper_bound - mem_limit < mem_limit_eq_threshold_mb
     report = get_report(desired_outcome=("NOT_SPILLED" if spill_mem else None))
     if not report:
       lower_bound = mem_limit
@@ -1478,8 +1470,9 @@ def populate_runtime_info(
       old_required_mem_mb_with_spilling = None
     else:
       mem_limit = (lower_bound + upper_bound) / 2
-    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC \
-        or upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+    LOG.info("Next mem_limit: {0}".format(mem_limit))
+    should_break = mem_limit / float(upper_bound) > 1 - mem_limit_eq_threshold_percent \
+        or upper_bound - mem_limit < mem_limit_eq_threshold_mb
     report = get_report(desired_outcome="SPILLED")
     if not report or report.mem_limit_exceeded:
       lower_bound = mem_limit
@@ -1488,12 +1481,12 @@ def populate_runtime_info(
       upper_bound = mem_limit
     if should_break:
       if not query.required_mem_mb_with_spilling:
-        if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB:
+        if upper_bound - mem_limit < mem_limit_eq_threshold_mb:
           # IMPALA-6604: A fair amount of queries go down this path.
           LOG.info(
               "Unable to find a memory limit with spilling within the threshold of {0} "
               "MB. Using the same memory limit for both.".format(
-                  MEM_LIMIT_EQ_THRESHOLD_MB))
+                  mem_limit_eq_threshold_mb))
         query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling
         query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling
         query.solo_runtime_profile_with_spilling = \
@@ -1862,8 +1855,6 @@ def populate_all_queries(
     queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options
 ):
   """Populate runtime info for all queries, ordered by the population_order property."""
-  common_query_options = converted_args.common_query_options
-  runtime_info_path = converted_args.runtime_info_path
   result = []
   queries_by_order = {}
   for query in queries:
@@ -1882,12 +1873,8 @@ def populate_all_queries(
         result.append(queries_with_runtime_info_by_db_sql_and_options[
             query.db_name][query.sql][str(sorted(query.options.items()))])
       else:
-        populate_runtime_info(
-            query, impala, converted_args.use_kerberos, converted_args.results_dir,
-            samples=converted_args.samples,
-            max_conflicting_samples=converted_args.max_conflicting_samples,
-            common_query_options=common_query_options)
-        save_runtime_info(runtime_info_path, query, impala)
+        populate_runtime_info(query, impala, converted_args)
+        save_runtime_info(converted_args.runtime_info_path, query, impala)
         query.write_runtime_info_profiles(
             os.path.join(converted_args.results_dir, PROFILES_DIR))
         result.append(query)
@@ -1951,6 +1938,16 @@ def main():
       ' max-conflicting-samples=1, then 4/5 queries must not spill at a particular mem'
       ' limit.')
   parser.add_argument(
+      "--mem-limit-eq-threshold-percent", default=0.025,
+      type=float, help='Used when collecting "runtime info". If the difference between'
+      ' two memory limits is less than this percentage, we consider the two limits to'
+      ' be equal and stop the memory binary search.')
+  parser.add_argument(
+      "--mem-limit-eq-threshold-mb", default=50,
+      type=int, help='Used when collecting "runtime info". If the difference between'
+      ' two memory limits is less than this value in MB, we consider the two limits to'
+      ' be equal and stop the memory binary search.')
+  parser.add_argument(
       "--results-dir", default=gettempdir(),
       help="Directory under which the profiles and result_hashes directories are created."
       " Query hash results are written in the result_hashes directory. If query results"
@@ -2063,8 +2060,6 @@ def main():
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
   converted_args = StressArgConverter(args)
-  common_query_options = converted_args.common_query_options
-  runtime_info_path = converted_args.runtime_info_path
 
   cli_options.configure_logging(
       args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True,
@@ -2097,7 +2092,7 @@ def main():
   impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit())
 
   queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
-      runtime_info_path, impala)
+      converted_args.runtime_info_path, impala)
 
   # Start loading the test queries.
   queries = list()
@@ -2174,17 +2169,14 @@ def main():
     with impala.cursor(db_name=args.random_db) as cursor:
       tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
     queries.extend(load_random_queries_and_populate_runtime_info(
-        query_generator, SqlWriter.create(), tables, args.random_db, impala,
-        args.use_kerberos, args.random_query_count, args.random_query_timeout_seconds,
-        args.results_dir))
+        query_generator, SqlWriter.create(), tables, impala, converted_args))
 
   if args.query_file_path:
     file_queries = load_queries_from_test_file(
         args.query_file_path, db_name=args.query_file_db)
     shuffle(file_queries)
     queries.extend(populate_runtime_info_for_random_queries(
-        impala, args.use_kerberos, file_queries, args.random_query_count,
-        args.random_query_timeout_seconds, args.results_dir))
+        impala, file_queries, converted_args))
 
   # Apply tweaks to the query's runtime info as requested by CLI options.
   for idx in xrange(len(queries) - 1, -1, -1):
@@ -2261,7 +2253,7 @@ def main():
   stress_runner.cancel_probability = args.cancel_probability
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
-  stress_runner.common_query_options = common_query_options
+  stress_runner.common_query_options = converted_args.common_query_options
   stress_runner.run_queries(
       queries, impala, args.max_queries, args.mem_overcommit_pct,
       should_print_status=not args.no_status,