You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2019/02/27 04:12:36 UTC

[impala] 01/03: [stress] pull out query and runtime info loading

This is an automated email from the ASF dual-hosted git repository.

kwho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0cc9371f0fce8b1c341d9cf034afe18590246070
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sun Feb 24 23:23:24 2019 -0800

    [stress] pull out query and runtime info loading
    
    This is an incremental step to reduce the size of concurrent_select.py.
    It is now "only" 2019 lines long.
    
    Testing:
    Ran various command-line invocations locally, including random and
    DML queries. Running a cluster stress test with the modifications.
    
    Change-Id: If65069cd1678cdf71091bd2601bc1fc1d745cec5
    Reviewed-on: http://gerrit.cloudera.org:8080/12576
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/comparison/cluster.py       |  12 ++
 tests/stress/concurrent_select.py | 424 ++------------------------------------
 tests/stress/queries.py           | 301 +++++++++++++++++++++++++++
 tests/stress/runtime_info.py      | 152 ++++++++++++++
 4 files changed, 479 insertions(+), 410 deletions(-)

diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index 33f29a9..2f02e10 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -171,6 +171,18 @@ class Cluster(object):
       self._init_impala()
     return self._impala
 
+  def print_version(self):
+    """
+    Print the cluster impalad version info to the console sorted by hostname.
+    """
+    def _sorter(i1, i2):
+      return cmp(i1.host_name, i2.host_name)
+
+    version_info = self.impala.get_version_info()
+    print("Cluster Impalad Version Info:")
+    for impalad in sorted(version_info.keys(), cmp=_sorter):
+      print("{0}: {1}".format(impalad.host_name, version_info[impalad]))
+
 
 class MiniCluster(Cluster):
 
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 67c5628..837b964 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -57,7 +57,6 @@
 
 from __future__ import print_function
 
-import json
 import logging
 import os
 import re
@@ -80,15 +79,15 @@ from threading import current_thread, Thread
 from time import sleep, time
 
 import tests.comparison.cli_options as cli_options
-import tests.util.test_file_parser as test_file_parser
 from tests.comparison.cluster import Timeout
 from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
-from tests.comparison.model_translator import SqlWriter
-from tests.comparison.query_generator import QueryGenerator
-from tests.comparison.query_profile import DefaultProfile
+from tests.stress.runtime_info import save_runtime_info, load_runtime_info
+from tests.stress.queries import (QueryType, generate_compute_stats_queries,
+    generate_DML_queries, generate_random_queries, load_tpc_queries,
+    load_queries_from_test_file, estimate_query_mem_mb_usage)
 from tests.util.parse_util import (
     EXPECTED_TPCDS_QUERIES_COUNT, EXPECTED_TPCH_NESTED_QUERIES_COUNT,
-    EXPECTED_TPCH_STRESS_QUERIES_COUNT, match_memory_estimate, parse_mem_to_mb)
+    EXPECTED_TPCH_STRESS_QUERIES_COUNT)
 from tests.util.thrift_util import op_handle_to_query_id
 
 LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
@@ -96,9 +95,6 @@ LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
 PROFILES_DIR = "profiles"
 RESULT_HASHES_DIR = "result_hashes"
 
-# The version of the file format containing the collected query runtime info.
-RUNTIME_INFO_FILE_VERSION = 3
-
 # Metrics collected during the stress running process.
 NUM_QUERIES_DEQUEUED = "num_queries_dequeued"
 # The number of queries that were submitted to a query runner.
@@ -1024,80 +1020,6 @@ class QueryTimeout(Exception):
   pass
 
 
-class QueryType(object):
-  COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
-
-
-class Query(object):
-  """Contains a SQL statement along with expected runtime information."""
-
-  def __init__(self):
-    self.name = None
-    self.sql = None
-    # In order to be able to make good estimates for DML queries in the binary search,
-    # we need to bring the table to a good initial state before excuting the sql. Running
-    # set_up_sql accomplishes this task.
-    self.set_up_sql = None
-    self.db_name = None
-    self.result_hash = None
-    self.required_mem_mb_with_spilling = None
-    self.required_mem_mb_without_spilling = None
-    self.solo_runtime_profile_with_spilling = None
-    self.solo_runtime_profile_without_spilling = None
-    self.solo_runtime_secs_with_spilling = None
-    self.solo_runtime_secs_without_spilling = None
-    # Query options to set before running the query.
-    self.options = {}
-    # Determines the order in which we will populate query runtime info. Queries with the
-    # lowest population_order property will be handled first.
-    self.population_order = 0
-    # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
-    # UPSERT, DELETE.
-    self.query_type = QueryType.SELECT
-
-    self._logical_query_id = None
-
-  def __repr__(self):
-    return dedent("""
-        <Query
-        Mem: %(required_mem_mb_with_spilling)s
-        Mem no-spilling: %(required_mem_mb_without_spilling)s
-        Solo Runtime: %(solo_runtime_secs_with_spilling)s
-        Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
-        DB: %(db_name)s
-        Options: %(options)s
-        Set up SQL: %(set_up_sql)s>
-        SQL: %(sql)s>
-        Population order: %(population_order)r>
-        """.strip() % self.__dict__)
-
-  @property
-  def logical_query_id(self):
-    """
-    Return a meanginful unique str identifier for the query.
-
-    Example: "tpcds_300_decimal_parquet_q21"
-    """
-    if self._logical_query_id is None:
-      self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
-    return self._logical_query_id
-
-  def write_runtime_info_profiles(self, directory):
-    """Write profiles for spilling and non-spilling into directory (str)."""
-    profiles_to_write = [
-        (self.logical_query_id + "_profile_with_spilling.txt",
-         self.solo_runtime_profile_with_spilling),
-        (self.logical_query_id + "_profile_without_spilling.txt",
-         self.solo_runtime_profile_without_spilling),
-    ]
-    for filename, profile in profiles_to_write:
-      if profile is None:
-        LOG.debug("No profile recorded for {0}".format(filename))
-        continue
-      with open(os.path.join(directory, filename), "w") as fh:
-        fh.write(profile)
-
-
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
 
@@ -1401,54 +1323,20 @@ class QueryRunner(object):
     return hash_thread.result
 
 
-def load_tpc_queries(workload):
-  """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
-  LOG.info("Loading %s queries", workload)
-  queries = []
-  for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
-      include_stress_queries=True).iteritems():
-    query = Query()
-    query.name = query_name
-    query.sql = query_sql
-    queries.append(query)
-  return queries
-
-
-def load_queries_from_test_file(file_path, db_name=None):
-  LOG.debug("Loading queries from %s", file_path)
-  test_cases = test_file_parser.parse_query_test_file(file_path)
-  queries = list()
-  for test_case in test_cases:
-    query = Query()
-    query.sql = test_file_parser.remove_comments(test_case["QUERY"])
-    query.db_name = db_name
-    queries.append(query)
-  return queries
-
-
-def load_random_queries_and_populate_runtime_info(
-    query_generator, model_translator, tables, impala, converted_args
-):
+def load_random_queries_and_populate_runtime_info(impala, converted_args):
   """Returns a list of random queries. Each query will also have its runtime info
   populated. The runtime info population also serves to validate the query.
   """
   LOG.info("Generating random queries")
-
-  def generate_candidates():
-    while True:
-      query_model = query_generator.generate_statement(tables)
-      sql = model_translator.write_query(query_model)
-      query = Query()
-      query.sql = sql
-      query.db_name = converted_args.random_db
-      yield query
   return populate_runtime_info_for_random_queries(
-      impala, generate_candidates(), converted_args)
+      impala, generate_random_queries(impala, converted_args.random_db), converted_args)
 
 
 def populate_runtime_info_for_random_queries(impala, candidate_queries, converted_args):
-  """Returns a list of random queries. Each query will also have its runtime info
-  populated. The runtime info population also serves to validate the query.
+  """Returns a list of random queries selected from candidate queries, which should be
+  a generator that will return an unlimited number of randomly generated queries.
+  Each query will also have its runtime info populated. The runtime info population
+  also serves to validate the query.
   """
   start_time = datetime.now()
   queries = list()
@@ -1591,7 +1479,7 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
     return reports[len(reports) / 2]
 
   if not any((old_required_mem_mb_with_spilling, old_required_mem_mb_without_spilling)):
-    mem_estimate = estimate_query_mem_mb_usage(query, runner)
+    mem_estimate = estimate_query_mem_mb_usage(query, runner.impalad_conn)
     LOG.info("Finding a starting point for binary search")
     mem_limit = min(mem_estimate, impala.min_impalad_mem_mb) or impala.min_impalad_mem_mb
     while True:
@@ -1699,273 +1587,6 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
   LOG.debug("Query after populating runtime info: %s", query)
 
 
-def estimate_query_mem_mb_usage(query, query_runner):
-  """Runs an explain plan then extracts and returns the estimated memory needed to run
-  the query.
-  """
-  with query_runner.impalad_conn.cursor() as cursor:
-    LOG.debug("Using %s database", query.db_name)
-    if query.db_name:
-      cursor.execute('USE ' + query.db_name)
-    if query.query_type == QueryType.COMPUTE_STATS:
-      # Running "explain" on compute stats is not supported by Impala.
-      return
-    LOG.debug("Explaining query\n%s", query.sql)
-    cursor.execute('EXPLAIN ' + query.sql)
-    explain_rows = cursor.fetchall()
-    explain_lines = [row[0] for row in explain_rows]
-    mem_limit, units = match_memory_estimate(explain_lines)
-    return parse_mem_to_mb(mem_limit, units)
-
-
-def save_runtime_info(path, query, impala):
-  """Updates the file at 'path' with the given query information."""
-  store = None
-  if os.path.exists(path):
-    with open(path) as file:
-      store = json.load(file)
-    _check_store_version(store)
-  if not store:
-    store = {
-        "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
-  with open(path, "w+") as file:
-    store["host_names"] = sorted([i.host_name for i in impala.impalads])
-    queries = store["db_names"].get(query.db_name, dict())
-    query_by_options = queries.get(query.sql, dict())
-    query_by_options[str(sorted(query.options.items()))] = query
-    queries[query.sql] = query_by_options
-    store["db_names"][query.db_name] = queries
-
-    class JsonEncoder(json.JSONEncoder):
-      def default(self, obj):
-        data = dict(obj.__dict__)
-        # Queries are stored by sql, so remove the duplicate data. Also don't store
-        # profiles as JSON values, but instead separately.
-        for k in ("sql", "solo_runtime_profile_with_spilling",
-                  "solo_runtime_profile_without_spilling"):
-          if k in data:
-            del data[k]
-        return data
-    json.dump(
-        store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
-
-
-def load_runtime_info(path, impala=None):
-  """Reads the query runtime information at 'path' and returns a
-  dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
-  instance do not match the data in 'path'.
-  """
-  queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
-  if not os.path.exists(path):
-    return queries_by_db_and_sql
-  with open(path) as file:
-    store = json.load(file)
-    _check_store_version(store)
-    if (
-        impala and
-        store.get("host_names") != sorted([i.host_name for i in impala.impalads])
-    ):
-      return queries_by_db_and_sql
-    for db_name, queries_by_sql in store["db_names"].iteritems():
-      for sql, queries_by_options in queries_by_sql.iteritems():
-        for options, json_query in queries_by_options.iteritems():
-          query = Query()
-          query.__dict__.update(json_query)
-          query.sql = sql
-          queries_by_db_and_sql[db_name][sql][options] = query
-  return queries_by_db_and_sql
-
-
-def _check_store_version(store):
-  """Clears 'store' if the version is too old or raises an error if the version is too
-  new.
-  """
-  if store["version"] < RUNTIME_INFO_FILE_VERSION:
-    LOG.warn("Runtime file info version is old and will be ignored")
-    store.clear()
-  elif store["version"] > RUNTIME_INFO_FILE_VERSION:
-    raise Exception(
-        "Unexpected runtime file info version %s expected %s"
-        % (store["version"], RUNTIME_INFO_FILE_VERSION))
-
-
-def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
-  # TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
-  #       when it was used.
-  print(",".join([
-      "Database", "Query",
-      "Old Mem MB w/Spilling",
-      "New Mem MB w/Spilling",
-      "Diff %",
-      "Old Runtime w/Spilling",
-      "New Runtime w/Spilling",
-      "Diff %",
-      "Old Mem MB wout/Spilling",
-      "New Mem MB wout/Spilling",
-      "Diff %",
-      "Old Runtime wout/Spilling",
-      "New Runtime wout/Spilling",
-      "Diff %"]))
-  for db_name, old_queries in old_runtime_info.iteritems():
-    new_queries = new_runtime_info.get(db_name)
-    if not new_queries:
-      continue
-    for sql, old_query in old_queries.iteritems():
-      new_query = new_queries.get(sql)
-      if not new_query:
-        continue
-      sys.stdout.write(old_query["db_name"])
-      sys.stdout.write(",")
-      sys.stdout.write(old_query["name"])
-      sys.stdout.write(",")
-      for attr in [
-          "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
-          "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
-      ]:
-        old_value = old_query[attr]
-        sys.stdout.write(str(old_value))
-        sys.stdout.write(",")
-        new_value = new_query[attr]
-        sys.stdout.write(str(new_value))
-        sys.stdout.write(",")
-        if old_value and new_value is not None:
-          sys.stdout.write("%0.2f%%" % (100 * float(new_value - old_value) / old_value))
-        else:
-          sys.stdout.write("N/A")
-        sys.stdout.write(",")
-      print()
-
-
-def generate_DML_queries(cursor, dml_mod_values):
-  """Generate insert, upsert, update, delete DML statements.
-
-  For each table in the database that cursor is connected to, create 4 DML queries
-  (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
-  controls which rows will be affected. The generated queries assume that for each table
-  in the database, there exists a table with a '_original' suffix that is never modified.
-
-  This function has some limitations:
-  1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
-  2. Requires that the type of the first column of the primary key is an integer type.
-  """
-  LOG.info("Generating DML queries")
-  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
-            if not t.endswith("_original")]
-  result = []
-  for table in tables:
-    if not table.primary_keys:
-      # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
-      # table.
-      LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
-      continue
-    if len(table.primary_keys) > 1:
-      # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
-      LOG.debug("Skipping table '{0}' because it has more than "
-                "1 primary key column.".format(table.name))
-      continue
-    primary_key = table.primary_keys[0]
-    if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
-      # We want to be able to apply the modulo operation on the primary key. If the
-      # the first primary key column happens to not be an integer, we will skip
-      # generating queries for this table
-      LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
-                "primary key is not an integer.".format(table.name, primary_key.name))
-      continue
-    for mod_value in dml_mod_values:
-      # Insert
-      insert_query = Query()
-      # Populate runtime info for Insert and Upsert queries before Update and Delete
-      # queries because tables remain in original state after running the Insert and
-      # Upsert queries. During the binary search in runtime info population for the
-      # Insert query, we first delete some rows and then reinsert them, so the table
-      # remains in the original state. For the delete, the order is reversed, so the table
-      # is not in the original state after running the the delete (or update) query. This
-      # is why population_order is smaller for Insert and Upsert queries.
-      insert_query.population_order = 1
-      insert_query.query_type = QueryType.INSERT
-      insert_query.name = "insert_{0}".format(table.name)
-      insert_query.db_name = cursor.db_name
-      insert_query.sql = (
-          "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
-          "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
-      # Upsert
-      upsert_query = Query()
-      upsert_query.population_order = 1
-      upsert_query.query_type = QueryType.UPSERT
-      upsert_query.name = "upsert_{0}".format(table.name)
-      upsert_query.db_name = cursor.db_name
-      upsert_query.sql = (
-          "UPSERT INTO TABLE {0} SELECT * "
-          "FROM {0}_original WHERE {1} % {2} = 0").format(
-              table.name, primary_key.name, mod_value)
-      # Update
-      update_query = Query()
-      update_query.population_order = 2
-      update_query.query_type = QueryType.UPDATE
-      update_query.name = "update_{0}".format(table.name)
-      update_query.db_name = cursor.db_name
-      update_list = ', '.join(
-          'a.{0} = b.{0}'.format(col.name)
-          for col in table.cols if not col.is_primary_key)
-      update_query.sql = (
-          "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
-          "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
-              table_name=table.name, pk=primary_key.name, mod_value=mod_value,
-              update_list=update_list)
-      # Delete
-      delete_query = Query()
-      delete_query.population_order = 2
-      delete_query.query_type = QueryType.DELETE
-      delete_query.name = "delete_{0}".format(table.name)
-      delete_query.db_name = cursor.db_name
-      delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
-          table.name, primary_key.name, mod_value)
-
-      if table.name + "_original" in set(table.name for table in tables):
-        insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
-            table.name, primary_key.name, mod_value)
-        upsert_query.set_up_sql = insert_query.set_up_sql
-        update_query.set_up_sql = (
-            "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
-            "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
-        delete_query.set_up_sql = update_query.set_up_sql
-
-      result.append(insert_query)
-      LOG.debug("Added insert query: {0}".format(insert_query))
-      result.append(update_query)
-      LOG.debug("Added update query: {0}".format(update_query))
-      result.append(upsert_query)
-      LOG.debug("Added upsert query: {0}".format(upsert_query))
-      result.append(delete_query)
-      LOG.debug("Added delete query: {0}".format(delete_query))
-  assert len(result) > 0, "No DML queries were added."
-  return result
-
-
-def generate_compute_stats_queries(cursor):
-  """For each table in the database that cursor is connected to, generate several compute
-  stats queries. Each query will have a different value for the MT_DOP query option.
-  """
-  LOG.info("Generating Compute Stats queries")
-  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
-            if not t.endswith("_original")]
-  result = []
-  mt_dop_values = [str(2**k) for k in range(5)]
-  for table in tables:
-    for mt_dop_value in mt_dop_values:
-      compute_query = Query()
-      compute_query.population_order = 1
-      compute_query.query_type = QueryType.COMPUTE_STATS
-      compute_query.sql = "COMPUTE STATS {0}".format(table.name)
-      compute_query.options["MT_DOP"] = mt_dop_value
-      compute_query.db_name = cursor.db_name
-      compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
-          table.name, compute_query.options["MT_DOP"])
-      result.append(compute_query)
-      LOG.debug("Added compute stats query: {0}".format(compute_query))
-  return result
-
-
 def prepare_database(cursor):
   """For each table in the database that cursor is connected to, create an identical copy
   with '_original' suffix. This function is idempotent.
@@ -2054,19 +1675,6 @@ def fetch_and_set_profile(cursor, report):
       LOG.debug("Error getting profile for query with id %s: %s", report.query_id, e)
 
 
-def print_version(cluster):
-  """
-  Print the cluster impalad version info to the console sorted by hostname.
-  """
-  def _sorter(i1, i2):
-    return cmp(i1.host_name, i2.host_name)
-
-  version_info = cluster.impala.get_version_info()
-  print("Cluster Impalad Version Info:")
-  for impalad in sorted(version_info.keys(), cmp=_sorter):
-    print("{0}: {1}".format(impalad.host_name, version_info[impalad]))
-
-
 def main():
   parser = ArgumentParser(
       epilog=dedent("""
@@ -2257,7 +1865,7 @@ def main():
   impala = cluster.impala
   if impala.find_stopped_impalads():
     impala.restart()
-  print_version(cluster)
+  cluster.print_version()
   impala.find_and_set_path_to_running_impalad_binary()
   if args.cancel_current_queries and impala.queries_are_running():
     impala.cancel_queries()
@@ -2340,11 +1948,7 @@ def main():
   # take a really long time to complete. So the queries needs to be validated. Since the
   # runtime info also needs to be collected, that will serve as validation.
   if args.random_db:
-    query_generator = QueryGenerator(DefaultProfile())
-    with impala.cursor(db_name=args.random_db) as cursor:
-      tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
-    queries.extend(load_random_queries_and_populate_runtime_info(
-        query_generator, SqlWriter.create(), tables, impala, converted_args))
+    queries.extend(load_random_queries_and_populate_runtime_info(impala, converted_args))
 
   if args.query_file_path:
     file_queries = load_queries_from_test_file(
diff --git a/tests/stress/queries.py b/tests/stress/queries.py
new file mode 100644
index 0000000..62b7038
--- /dev/null
+++ b/tests/stress/queries.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This module implements helpers for representing queries to be executed by the
+# stress test, loading them and generating them.
+
+import logging
+import os
+from textwrap import dedent
+
+from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
+from tests.comparison.query_generator import QueryGenerator
+from tests.comparison.query_profile import DefaultProfile
+from tests.comparison.model_translator import SqlWriter
+from tests.util.parse_util import match_memory_estimate, parse_mem_to_mb
+import tests.util.test_file_parser as test_file_parser
+
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+
+
+class QueryType(object):
+  COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
+
+
+class Query(object):
+  """Contains a SQL statement along with expected runtime information.
+  This class is used as a struct, with the fields filled out by calling
+  classes."""
+
+  def __init__(self):
+    self.name = None
+    self.sql = None
+    # In order to be able to make good estimates for DML queries in the binary search,
+    # we need to bring the table to a good initial state before excuting the sql. Running
+    # set_up_sql accomplishes this task.
+    self.set_up_sql = None
+    self.db_name = None
+    self.result_hash = None
+    self.required_mem_mb_with_spilling = None
+    self.required_mem_mb_without_spilling = None
+    self.solo_runtime_profile_with_spilling = None
+    self.solo_runtime_profile_without_spilling = None
+    self.solo_runtime_secs_with_spilling = None
+    self.solo_runtime_secs_without_spilling = None
+    # Query options to set before running the query.
+    self.options = {}
+    # Determines the order in which we will populate query runtime info. Queries with the
+    # lowest population_order property will be handled first.
+    self.population_order = 0
+    # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
+    # UPSERT, DELETE.
+    self.query_type = QueryType.SELECT
+
+    self._logical_query_id = None
+
+  def __repr__(self):
+    return dedent("""
+        <Query
+        Mem: %(required_mem_mb_with_spilling)s
+        Mem no-spilling: %(required_mem_mb_without_spilling)s
+        Solo Runtime: %(solo_runtime_secs_with_spilling)s
+        Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
+        DB: %(db_name)s
+        Options: %(options)s
+        Set up SQL: %(set_up_sql)s>
+        SQL: %(sql)s>
+        Population order: %(population_order)r>
+        """.strip() % self.__dict__)
+
+  @property
+  def logical_query_id(self):
+    """
+    Return a meanginful unique str identifier for the query.
+
+    Example: "tpcds_300_decimal_parquet_q21"
+    """
+    if self._logical_query_id is None:
+      self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
+    return self._logical_query_id
+
+  def write_runtime_info_profiles(self, directory):
+    """Write profiles for spilling and non-spilling into directory (str)."""
+    profiles_to_write = [
+        (self.logical_query_id + "_profile_with_spilling.txt",
+         self.solo_runtime_profile_with_spilling),
+        (self.logical_query_id + "_profile_without_spilling.txt",
+         self.solo_runtime_profile_without_spilling),
+    ]
+    for filename, profile in profiles_to_write:
+      if profile is None:
+        LOG.debug("No profile recorded for {0}".format(filename))
+        continue
+      with open(os.path.join(directory, filename), "w") as fh:
+        fh.write(profile)
+
+
+def load_tpc_queries(workload):
+  """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
+  LOG.info("Loading %s queries", workload)
+  queries = []
+  for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
+      include_stress_queries=True).iteritems():
+    query = Query()
+    query.name = query_name
+    query.sql = query_sql
+    queries.append(query)
+  return queries
+
+
+def load_queries_from_test_file(file_path, db_name=None):
+  LOG.debug("Loading queries from %s", file_path)
+  test_cases = test_file_parser.parse_query_test_file(file_path)
+  queries = list()
+  for test_case in test_cases:
+    query = Query()
+    query.sql = test_file_parser.remove_comments(test_case["QUERY"])
+    query.db_name = db_name
+    queries.append(query)
+  return queries
+
+
+def generate_compute_stats_queries(cursor):
+  """For each table in the database that cursor is connected to, generate several compute
+  stats queries. Each query will have a different value for the MT_DOP query option.
+  """
+  LOG.info("Generating Compute Stats queries")
+  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+            if not t.endswith("_original")]
+  result = []
+  mt_dop_values = [str(2**k) for k in range(5)]
+  for table in tables:
+    for mt_dop_value in mt_dop_values:
+      compute_query = Query()
+      compute_query.population_order = 1
+      compute_query.query_type = QueryType.COMPUTE_STATS
+      compute_query.sql = "COMPUTE STATS {0}".format(table.name)
+      compute_query.options["MT_DOP"] = mt_dop_value
+      compute_query.db_name = cursor.db_name
+      compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
+          table.name, compute_query.options["MT_DOP"])
+      result.append(compute_query)
+      LOG.debug("Added compute stats query: {0}".format(compute_query))
+  return result
+
+
+def generate_DML_queries(cursor, dml_mod_values):
+  """Generate insert, upsert, update, delete DML statements.
+
+  For each table in the database that cursor is connected to, create 4 DML queries
+  (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
+  controls which rows will be affected. The generated queries assume that for each table
+  in the database, there exists a table with a '_original' suffix that is never modified.
+
+  This function has some limitations:
+  1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
+  2. Requires that the type of the first column of the primary key is an integer type.
+  """
+  LOG.info("Generating DML queries")
+  tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+            if not t.endswith("_original")]
+  result = []
+  for table in tables:
+    if not table.primary_keys:
+      # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
+      # table.
+      LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
+      continue
+    if len(table.primary_keys) > 1:
+      # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
+      LOG.debug("Skipping table '{0}' because it has more than "
+                "1 primary key column.".format(table.name))
+      continue
+    primary_key = table.primary_keys[0]
+    if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
+      # We want to be able to apply the modulo operation on the primary key. If the
+      # the first primary key column happens to not be an integer, we will skip
+      # generating queries for this table
+      LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
+                "primary key is not an integer.".format(table.name, primary_key.name))
+      continue
+    for mod_value in dml_mod_values:
+      # Insert
+      insert_query = Query()
+      # Populate runtime info for Insert and Upsert queries before Update and Delete
+      # queries because tables remain in original state after running the Insert and
+      # Upsert queries. During the binary search in runtime info population for the
+      # Insert query, we first delete some rows and then reinsert them, so the table
+      # remains in the original state. For the delete, the order is reversed, so the table
+      # is not in the original state after running the the delete (or update) query. This
+      # is why population_order is smaller for Insert and Upsert queries.
+      insert_query.population_order = 1
+      insert_query.query_type = QueryType.INSERT
+      insert_query.name = "insert_{0}".format(table.name)
+      insert_query.db_name = cursor.db_name
+      insert_query.sql = (
+          "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
+          "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+      # Upsert
+      upsert_query = Query()
+      upsert_query.population_order = 1
+      upsert_query.query_type = QueryType.UPSERT
+      upsert_query.name = "upsert_{0}".format(table.name)
+      upsert_query.db_name = cursor.db_name
+      upsert_query.sql = (
+          "UPSERT INTO TABLE {0} SELECT * "
+          "FROM {0}_original WHERE {1} % {2} = 0").format(
+              table.name, primary_key.name, mod_value)
+      # Update
+      update_query = Query()
+      update_query.population_order = 2
+      update_query.query_type = QueryType.UPDATE
+      update_query.name = "update_{0}".format(table.name)
+      update_query.db_name = cursor.db_name
+      update_list = ', '.join(
+          'a.{0} = b.{0}'.format(col.name)
+          for col in table.cols if not col.is_primary_key)
+      update_query.sql = (
+          "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
+          "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
+              table_name=table.name, pk=primary_key.name, mod_value=mod_value,
+              update_list=update_list)
+      # Delete
+      delete_query = Query()
+      delete_query.population_order = 2
+      delete_query.query_type = QueryType.DELETE
+      delete_query.name = "delete_{0}".format(table.name)
+      delete_query.db_name = cursor.db_name
+      delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
+          table.name, primary_key.name, mod_value)
+
+      if table.name + "_original" in set(table.name for table in tables):
+        insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
+            table.name, primary_key.name, mod_value)
+        upsert_query.set_up_sql = insert_query.set_up_sql
+        update_query.set_up_sql = (
+            "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
+            "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+        delete_query.set_up_sql = update_query.set_up_sql
+
+      result.append(insert_query)
+      LOG.debug("Added insert query: {0}".format(insert_query))
+      result.append(update_query)
+      LOG.debug("Added update query: {0}".format(update_query))
+      result.append(upsert_query)
+      LOG.debug("Added upsert query: {0}".format(upsert_query))
+      result.append(delete_query)
+      LOG.debug("Added delete query: {0}".format(delete_query))
+  assert len(result) > 0, "No DML queries were added."
+  return result
+
+
+def generate_random_queries(impala, random_db):
+  """Generator function to produce random queries. 'impala' is the Impala service
+  object. random_db is the name of the database that queries should be
+  generated for."""
+  with impala.cursor(db_name=random_db) as cursor:
+    tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
+  query_generator = QueryGenerator(DefaultProfile())
+  model_translator = SqlWriter.create()
+  while True:
+    query_model = query_generator.generate_statement(tables)
+    sql = model_translator.write_query(query_model)
+    query = Query()
+    query.sql = sql
+    query.db_name = random_db
+    yield query
+
+
+def estimate_query_mem_mb_usage(query, impalad_conn):
+  """Runs an explain plan then extracts and returns the estimated memory needed to run
+  the query.
+  """
+  with impalad_conn.cursor() as cursor:
+    LOG.debug("Using %s database", query.db_name)
+    if query.db_name:
+      cursor.execute('USE ' + query.db_name)
+    if query.query_type == QueryType.COMPUTE_STATS:
+      # Running "explain" on compute stats is not supported by Impala.
+      return
+    LOG.debug("Explaining query\n%s", query.sql)
+    cursor.execute('EXPLAIN ' + query.sql)
+    explain_rows = cursor.fetchall()
+    explain_lines = [row[0] for row in explain_rows]
+    mem_limit, units = match_memory_estimate(explain_lines)
+    return parse_mem_to_mb(mem_limit, units)
diff --git a/tests/stress/runtime_info.py b/tests/stress/runtime_info.py
new file mode 100644
index 0000000..6cba385
--- /dev/null
+++ b/tests/stress/runtime_info.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Utility functions used by the stress test to save and load runtime info
+# about queries to and from JSON files.
+
+from collections import defaultdict
+import json
+import logging
+import os
+import sys
+
+from tests.stress.queries import Query
+
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+
+# The version of the file format containing the collected query runtime info.
+RUNTIME_INFO_FILE_VERSION = 3
+
+
+def save_runtime_info(path, query, impala):
+  """Updates the file at 'path' with the given query information."""
+  store = None
+  if os.path.exists(path):
+    with open(path) as file:
+      store = json.load(file)
+    _check_store_version(store)
+  if not store:
+    store = {
+        "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
+  with open(path, "w+") as file:
+    store["host_names"] = sorted([i.host_name for i in impala.impalads])
+    queries = store["db_names"].get(query.db_name, dict())
+    query_by_options = queries.get(query.sql, dict())
+    query_by_options[str(sorted(query.options.items()))] = query
+    queries[query.sql] = query_by_options
+    store["db_names"][query.db_name] = queries
+
+    class JsonEncoder(json.JSONEncoder):
+      def default(self, obj):
+        data = dict(obj.__dict__)
+        # Queries are stored by sql, so remove the duplicate data. Also don't store
+        # profiles as JSON values, but instead separately.
+        for k in ("sql", "solo_runtime_profile_with_spilling",
+                  "solo_runtime_profile_without_spilling"):
+          if k in data:
+            del data[k]
+        return data
+    json.dump(
+        store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
+
+
+def load_runtime_info(path, impala=None):
+  """Reads the query runtime information at 'path' and returns a
+  dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
+  instance do not match the data in 'path'.
+  """
+  queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
+  if not os.path.exists(path):
+    return queries_by_db_and_sql
+  with open(path) as file:
+    store = json.load(file)
+    _check_store_version(store)
+    if (
+        impala and
+        store.get("host_names") != sorted([i.host_name for i in impala.impalads])
+    ):
+      return queries_by_db_and_sql
+    for db_name, queries_by_sql in store["db_names"].iteritems():
+      for sql, queries_by_options in queries_by_sql.iteritems():
+        for options, json_query in queries_by_options.iteritems():
+          query = Query()
+          query.__dict__.update(json_query)
+          query.sql = sql
+          queries_by_db_and_sql[db_name][sql][options] = query
+  return queries_by_db_and_sql
+
+
+def _check_store_version(store):
+  """Clears 'store' if the version is too old or raises an error if the version is too
+  new.
+  """
+  if store["version"] < RUNTIME_INFO_FILE_VERSION:
+    LOG.warn("Runtime file info version is old and will be ignored")
+    store.clear()
+  elif store["version"] > RUNTIME_INFO_FILE_VERSION:
+    raise Exception(
+        "Unexpected runtime file info version %s expected %s"
+        % (store["version"], RUNTIME_INFO_FILE_VERSION))
+
+
+def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
+  # TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
+  #       when it was used.
+  print(",".join([
+      "Database", "Query",
+      "Old Mem MB w/Spilling",
+      "New Mem MB w/Spilling",
+      "Diff %",
+      "Old Runtime w/Spilling",
+      "New Runtime w/Spilling",
+      "Diff %",
+      "Old Mem MB wout/Spilling",
+      "New Mem MB wout/Spilling",
+      "Diff %",
+      "Old Runtime wout/Spilling",
+      "New Runtime wout/Spilling",
+      "Diff %"]))
+  for db_name, old_queries in old_runtime_info.iteritems():
+    new_queries = new_runtime_info.get(db_name)
+    if not new_queries:
+      continue
+    for sql, old_query in old_queries.iteritems():
+      new_query = new_queries.get(sql)
+      if not new_query:
+        continue
+      sys.stdout.write(old_query["db_name"])
+      sys.stdout.write(",")
+      sys.stdout.write(old_query["name"])
+      sys.stdout.write(",")
+      for attr in [
+          "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
+          "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
+      ]:
+        old_value = old_query[attr]
+        sys.stdout.write(str(old_value))
+        sys.stdout.write(",")
+        new_value = new_query[attr]
+        sys.stdout.write(str(new_value))
+        sys.stdout.write(",")
+        if old_value and new_value is not None:
+          sys.stdout.write("%0.2f%%" % (100 * float(new_value - old_value) / old_value))
+        else:
+          sys.stdout.write("N/A")
+        sys.stdout.write(",")
+      print()