You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2019/02/27 04:12:36 UTC
[impala] 01/03: [stress] pull out query and runtime info loading
This is an automated email from the ASF dual-hosted git repository.
kwho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0cc9371f0fce8b1c341d9cf034afe18590246070
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Sun Feb 24 23:23:24 2019 -0800
[stress] pull out query and runtime info loading
This is an incremental step to reduce the size of concurrent_select.py.
It is now "only" 2019 lines long.
Testing:
Ran various command-line invocations locally, including random and
DML queries. Running a cluster stress test with the modifications.
Change-Id: If65069cd1678cdf71091bd2601bc1fc1d745cec5
Reviewed-on: http://gerrit.cloudera.org:8080/12576
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
tests/comparison/cluster.py | 12 ++
tests/stress/concurrent_select.py | 424 ++------------------------------------
tests/stress/queries.py | 301 +++++++++++++++++++++++++++
tests/stress/runtime_info.py | 152 ++++++++++++++
4 files changed, 479 insertions(+), 410 deletions(-)
diff --git a/tests/comparison/cluster.py b/tests/comparison/cluster.py
index 33f29a9..2f02e10 100644
--- a/tests/comparison/cluster.py
+++ b/tests/comparison/cluster.py
@@ -171,6 +171,18 @@ class Cluster(object):
self._init_impala()
return self._impala
+ def print_version(self):
+ """
+ Print the cluster impalad version info to the console sorted by hostname.
+ """
+ def _sorter(i1, i2):
+ return cmp(i1.host_name, i2.host_name)
+
+ version_info = self.impala.get_version_info()
+ print("Cluster Impalad Version Info:")
+ for impalad in sorted(version_info.keys(), cmp=_sorter):
+ print("{0}: {1}".format(impalad.host_name, version_info[impalad]))
+
class MiniCluster(Cluster):
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 67c5628..837b964 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -57,7 +57,6 @@
from __future__ import print_function
-import json
import logging
import os
import re
@@ -80,15 +79,15 @@ from threading import current_thread, Thread
from time import sleep, time
import tests.comparison.cli_options as cli_options
-import tests.util.test_file_parser as test_file_parser
from tests.comparison.cluster import Timeout
from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
-from tests.comparison.model_translator import SqlWriter
-from tests.comparison.query_generator import QueryGenerator
-from tests.comparison.query_profile import DefaultProfile
+from tests.stress.runtime_info import save_runtime_info, load_runtime_info
+from tests.stress.queries import (QueryType, generate_compute_stats_queries,
+ generate_DML_queries, generate_random_queries, load_tpc_queries,
+ load_queries_from_test_file, estimate_query_mem_mb_usage)
from tests.util.parse_util import (
EXPECTED_TPCDS_QUERIES_COUNT, EXPECTED_TPCH_NESTED_QUERIES_COUNT,
- EXPECTED_TPCH_STRESS_QUERIES_COUNT, match_memory_estimate, parse_mem_to_mb)
+ EXPECTED_TPCH_STRESS_QUERIES_COUNT)
from tests.util.thrift_util import op_handle_to_query_id
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
@@ -96,9 +95,6 @@ LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
PROFILES_DIR = "profiles"
RESULT_HASHES_DIR = "result_hashes"
-# The version of the file format containing the collected query runtime info.
-RUNTIME_INFO_FILE_VERSION = 3
-
# Metrics collected during the stress running process.
NUM_QUERIES_DEQUEUED = "num_queries_dequeued"
# The number of queries that were submitted to a query runner.
@@ -1024,80 +1020,6 @@ class QueryTimeout(Exception):
pass
-class QueryType(object):
- COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
-
-
-class Query(object):
- """Contains a SQL statement along with expected runtime information."""
-
- def __init__(self):
- self.name = None
- self.sql = None
- # In order to be able to make good estimates for DML queries in the binary search,
- # we need to bring the table to a good initial state before excuting the sql. Running
- # set_up_sql accomplishes this task.
- self.set_up_sql = None
- self.db_name = None
- self.result_hash = None
- self.required_mem_mb_with_spilling = None
- self.required_mem_mb_without_spilling = None
- self.solo_runtime_profile_with_spilling = None
- self.solo_runtime_profile_without_spilling = None
- self.solo_runtime_secs_with_spilling = None
- self.solo_runtime_secs_without_spilling = None
- # Query options to set before running the query.
- self.options = {}
- # Determines the order in which we will populate query runtime info. Queries with the
- # lowest population_order property will be handled first.
- self.population_order = 0
- # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
- # UPSERT, DELETE.
- self.query_type = QueryType.SELECT
-
- self._logical_query_id = None
-
- def __repr__(self):
- return dedent("""
- <Query
- Mem: %(required_mem_mb_with_spilling)s
- Mem no-spilling: %(required_mem_mb_without_spilling)s
- Solo Runtime: %(solo_runtime_secs_with_spilling)s
- Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
- DB: %(db_name)s
- Options: %(options)s
- Set up SQL: %(set_up_sql)s>
- SQL: %(sql)s>
- Population order: %(population_order)r>
- """.strip() % self.__dict__)
-
- @property
- def logical_query_id(self):
- """
- Return a meanginful unique str identifier for the query.
-
- Example: "tpcds_300_decimal_parquet_q21"
- """
- if self._logical_query_id is None:
- self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
- return self._logical_query_id
-
- def write_runtime_info_profiles(self, directory):
- """Write profiles for spilling and non-spilling into directory (str)."""
- profiles_to_write = [
- (self.logical_query_id + "_profile_with_spilling.txt",
- self.solo_runtime_profile_with_spilling),
- (self.logical_query_id + "_profile_without_spilling.txt",
- self.solo_runtime_profile_without_spilling),
- ]
- for filename, profile in profiles_to_write:
- if profile is None:
- LOG.debug("No profile recorded for {0}".format(filename))
- continue
- with open(os.path.join(directory, filename), "w") as fh:
- fh.write(profile)
-
-
class QueryRunner(object):
"""Encapsulates functionality to run a query and provide a runtime report."""
@@ -1401,54 +1323,20 @@ class QueryRunner(object):
return hash_thread.result
-def load_tpc_queries(workload):
- """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
- LOG.info("Loading %s queries", workload)
- queries = []
- for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
- include_stress_queries=True).iteritems():
- query = Query()
- query.name = query_name
- query.sql = query_sql
- queries.append(query)
- return queries
-
-
-def load_queries_from_test_file(file_path, db_name=None):
- LOG.debug("Loading queries from %s", file_path)
- test_cases = test_file_parser.parse_query_test_file(file_path)
- queries = list()
- for test_case in test_cases:
- query = Query()
- query.sql = test_file_parser.remove_comments(test_case["QUERY"])
- query.db_name = db_name
- queries.append(query)
- return queries
-
-
-def load_random_queries_and_populate_runtime_info(
- query_generator, model_translator, tables, impala, converted_args
-):
+def load_random_queries_and_populate_runtime_info(impala, converted_args):
"""Returns a list of random queries. Each query will also have its runtime info
populated. The runtime info population also serves to validate the query.
"""
LOG.info("Generating random queries")
-
- def generate_candidates():
- while True:
- query_model = query_generator.generate_statement(tables)
- sql = model_translator.write_query(query_model)
- query = Query()
- query.sql = sql
- query.db_name = converted_args.random_db
- yield query
return populate_runtime_info_for_random_queries(
- impala, generate_candidates(), converted_args)
+ impala, generate_random_queries(impala, converted_args.random_db), converted_args)
def populate_runtime_info_for_random_queries(impala, candidate_queries, converted_args):
- """Returns a list of random queries. Each query will also have its runtime info
- populated. The runtime info population also serves to validate the query.
+ """Returns a list of random queries selected from candidate queries, which should be
+ a generator that will return an unlimited number of randomly generated queries.
+ Each query will also have its runtime info populated. The runtime info population
+ also serves to validate the query.
"""
start_time = datetime.now()
queries = list()
@@ -1591,7 +1479,7 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
return reports[len(reports) / 2]
if not any((old_required_mem_mb_with_spilling, old_required_mem_mb_without_spilling)):
- mem_estimate = estimate_query_mem_mb_usage(query, runner)
+ mem_estimate = estimate_query_mem_mb_usage(query, runner.impalad_conn)
LOG.info("Finding a starting point for binary search")
mem_limit = min(mem_estimate, impala.min_impalad_mem_mb) or impala.min_impalad_mem_mb
while True:
@@ -1699,273 +1587,6 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
LOG.debug("Query after populating runtime info: %s", query)
-def estimate_query_mem_mb_usage(query, query_runner):
- """Runs an explain plan then extracts and returns the estimated memory needed to run
- the query.
- """
- with query_runner.impalad_conn.cursor() as cursor:
- LOG.debug("Using %s database", query.db_name)
- if query.db_name:
- cursor.execute('USE ' + query.db_name)
- if query.query_type == QueryType.COMPUTE_STATS:
- # Running "explain" on compute stats is not supported by Impala.
- return
- LOG.debug("Explaining query\n%s", query.sql)
- cursor.execute('EXPLAIN ' + query.sql)
- explain_rows = cursor.fetchall()
- explain_lines = [row[0] for row in explain_rows]
- mem_limit, units = match_memory_estimate(explain_lines)
- return parse_mem_to_mb(mem_limit, units)
-
-
-def save_runtime_info(path, query, impala):
- """Updates the file at 'path' with the given query information."""
- store = None
- if os.path.exists(path):
- with open(path) as file:
- store = json.load(file)
- _check_store_version(store)
- if not store:
- store = {
- "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
- with open(path, "w+") as file:
- store["host_names"] = sorted([i.host_name for i in impala.impalads])
- queries = store["db_names"].get(query.db_name, dict())
- query_by_options = queries.get(query.sql, dict())
- query_by_options[str(sorted(query.options.items()))] = query
- queries[query.sql] = query_by_options
- store["db_names"][query.db_name] = queries
-
- class JsonEncoder(json.JSONEncoder):
- def default(self, obj):
- data = dict(obj.__dict__)
- # Queries are stored by sql, so remove the duplicate data. Also don't store
- # profiles as JSON values, but instead separately.
- for k in ("sql", "solo_runtime_profile_with_spilling",
- "solo_runtime_profile_without_spilling"):
- if k in data:
- del data[k]
- return data
- json.dump(
- store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
-
-
-def load_runtime_info(path, impala=None):
- """Reads the query runtime information at 'path' and returns a
- dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
- instance do not match the data in 'path'.
- """
- queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
- if not os.path.exists(path):
- return queries_by_db_and_sql
- with open(path) as file:
- store = json.load(file)
- _check_store_version(store)
- if (
- impala and
- store.get("host_names") != sorted([i.host_name for i in impala.impalads])
- ):
- return queries_by_db_and_sql
- for db_name, queries_by_sql in store["db_names"].iteritems():
- for sql, queries_by_options in queries_by_sql.iteritems():
- for options, json_query in queries_by_options.iteritems():
- query = Query()
- query.__dict__.update(json_query)
- query.sql = sql
- queries_by_db_and_sql[db_name][sql][options] = query
- return queries_by_db_and_sql
-
-
-def _check_store_version(store):
- """Clears 'store' if the version is too old or raises an error if the version is too
- new.
- """
- if store["version"] < RUNTIME_INFO_FILE_VERSION:
- LOG.warn("Runtime file info version is old and will be ignored")
- store.clear()
- elif store["version"] > RUNTIME_INFO_FILE_VERSION:
- raise Exception(
- "Unexpected runtime file info version %s expected %s"
- % (store["version"], RUNTIME_INFO_FILE_VERSION))
-
-
-def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
- # TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
- # when it was used.
- print(",".join([
- "Database", "Query",
- "Old Mem MB w/Spilling",
- "New Mem MB w/Spilling",
- "Diff %",
- "Old Runtime w/Spilling",
- "New Runtime w/Spilling",
- "Diff %",
- "Old Mem MB wout/Spilling",
- "New Mem MB wout/Spilling",
- "Diff %",
- "Old Runtime wout/Spilling",
- "New Runtime wout/Spilling",
- "Diff %"]))
- for db_name, old_queries in old_runtime_info.iteritems():
- new_queries = new_runtime_info.get(db_name)
- if not new_queries:
- continue
- for sql, old_query in old_queries.iteritems():
- new_query = new_queries.get(sql)
- if not new_query:
- continue
- sys.stdout.write(old_query["db_name"])
- sys.stdout.write(",")
- sys.stdout.write(old_query["name"])
- sys.stdout.write(",")
- for attr in [
- "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
- "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
- ]:
- old_value = old_query[attr]
- sys.stdout.write(str(old_value))
- sys.stdout.write(",")
- new_value = new_query[attr]
- sys.stdout.write(str(new_value))
- sys.stdout.write(",")
- if old_value and new_value is not None:
- sys.stdout.write("%0.2f%%" % (100 * float(new_value - old_value) / old_value))
- else:
- sys.stdout.write("N/A")
- sys.stdout.write(",")
- print()
-
-
-def generate_DML_queries(cursor, dml_mod_values):
- """Generate insert, upsert, update, delete DML statements.
-
- For each table in the database that cursor is connected to, create 4 DML queries
- (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
- controls which rows will be affected. The generated queries assume that for each table
- in the database, there exists a table with a '_original' suffix that is never modified.
-
- This function has some limitations:
- 1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
- 2. Requires that the type of the first column of the primary key is an integer type.
- """
- LOG.info("Generating DML queries")
- tables = [cursor.describe_table(t) for t in cursor.list_table_names()
- if not t.endswith("_original")]
- result = []
- for table in tables:
- if not table.primary_keys:
- # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
- # table.
- LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
- continue
- if len(table.primary_keys) > 1:
- # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
- LOG.debug("Skipping table '{0}' because it has more than "
- "1 primary key column.".format(table.name))
- continue
- primary_key = table.primary_keys[0]
- if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
- # We want to be able to apply the modulo operation on the primary key. If the
- # the first primary key column happens to not be an integer, we will skip
- # generating queries for this table
- LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
- "primary key is not an integer.".format(table.name, primary_key.name))
- continue
- for mod_value in dml_mod_values:
- # Insert
- insert_query = Query()
- # Populate runtime info for Insert and Upsert queries before Update and Delete
- # queries because tables remain in original state after running the Insert and
- # Upsert queries. During the binary search in runtime info population for the
- # Insert query, we first delete some rows and then reinsert them, so the table
- # remains in the original state. For the delete, the order is reversed, so the table
- # is not in the original state after running the the delete (or update) query. This
- # is why population_order is smaller for Insert and Upsert queries.
- insert_query.population_order = 1
- insert_query.query_type = QueryType.INSERT
- insert_query.name = "insert_{0}".format(table.name)
- insert_query.db_name = cursor.db_name
- insert_query.sql = (
- "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
- "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
- # Upsert
- upsert_query = Query()
- upsert_query.population_order = 1
- upsert_query.query_type = QueryType.UPSERT
- upsert_query.name = "upsert_{0}".format(table.name)
- upsert_query.db_name = cursor.db_name
- upsert_query.sql = (
- "UPSERT INTO TABLE {0} SELECT * "
- "FROM {0}_original WHERE {1} % {2} = 0").format(
- table.name, primary_key.name, mod_value)
- # Update
- update_query = Query()
- update_query.population_order = 2
- update_query.query_type = QueryType.UPDATE
- update_query.name = "update_{0}".format(table.name)
- update_query.db_name = cursor.db_name
- update_list = ', '.join(
- 'a.{0} = b.{0}'.format(col.name)
- for col in table.cols if not col.is_primary_key)
- update_query.sql = (
- "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
- "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
- table_name=table.name, pk=primary_key.name, mod_value=mod_value,
- update_list=update_list)
- # Delete
- delete_query = Query()
- delete_query.population_order = 2
- delete_query.query_type = QueryType.DELETE
- delete_query.name = "delete_{0}".format(table.name)
- delete_query.db_name = cursor.db_name
- delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
- table.name, primary_key.name, mod_value)
-
- if table.name + "_original" in set(table.name for table in tables):
- insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
- table.name, primary_key.name, mod_value)
- upsert_query.set_up_sql = insert_query.set_up_sql
- update_query.set_up_sql = (
- "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
- "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
- delete_query.set_up_sql = update_query.set_up_sql
-
- result.append(insert_query)
- LOG.debug("Added insert query: {0}".format(insert_query))
- result.append(update_query)
- LOG.debug("Added update query: {0}".format(update_query))
- result.append(upsert_query)
- LOG.debug("Added upsert query: {0}".format(upsert_query))
- result.append(delete_query)
- LOG.debug("Added delete query: {0}".format(delete_query))
- assert len(result) > 0, "No DML queries were added."
- return result
-
-
-def generate_compute_stats_queries(cursor):
- """For each table in the database that cursor is connected to, generate several compute
- stats queries. Each query will have a different value for the MT_DOP query option.
- """
- LOG.info("Generating Compute Stats queries")
- tables = [cursor.describe_table(t) for t in cursor.list_table_names()
- if not t.endswith("_original")]
- result = []
- mt_dop_values = [str(2**k) for k in range(5)]
- for table in tables:
- for mt_dop_value in mt_dop_values:
- compute_query = Query()
- compute_query.population_order = 1
- compute_query.query_type = QueryType.COMPUTE_STATS
- compute_query.sql = "COMPUTE STATS {0}".format(table.name)
- compute_query.options["MT_DOP"] = mt_dop_value
- compute_query.db_name = cursor.db_name
- compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
- table.name, compute_query.options["MT_DOP"])
- result.append(compute_query)
- LOG.debug("Added compute stats query: {0}".format(compute_query))
- return result
-
-
def prepare_database(cursor):
"""For each table in the database that cursor is connected to, create an identical copy
with '_original' suffix. This function is idempotent.
@@ -2054,19 +1675,6 @@ def fetch_and_set_profile(cursor, report):
LOG.debug("Error getting profile for query with id %s: %s", report.query_id, e)
-def print_version(cluster):
- """
- Print the cluster impalad version info to the console sorted by hostname.
- """
- def _sorter(i1, i2):
- return cmp(i1.host_name, i2.host_name)
-
- version_info = cluster.impala.get_version_info()
- print("Cluster Impalad Version Info:")
- for impalad in sorted(version_info.keys(), cmp=_sorter):
- print("{0}: {1}".format(impalad.host_name, version_info[impalad]))
-
-
def main():
parser = ArgumentParser(
epilog=dedent("""
@@ -2257,7 +1865,7 @@ def main():
impala = cluster.impala
if impala.find_stopped_impalads():
impala.restart()
- print_version(cluster)
+ cluster.print_version()
impala.find_and_set_path_to_running_impalad_binary()
if args.cancel_current_queries and impala.queries_are_running():
impala.cancel_queries()
@@ -2340,11 +1948,7 @@ def main():
# take a really long time to complete. So the queries needs to be validated. Since the
# runtime info also needs to be collected, that will serve as validation.
if args.random_db:
- query_generator = QueryGenerator(DefaultProfile())
- with impala.cursor(db_name=args.random_db) as cursor:
- tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
- queries.extend(load_random_queries_and_populate_runtime_info(
- query_generator, SqlWriter.create(), tables, impala, converted_args))
+ queries.extend(load_random_queries_and_populate_runtime_info(impala, converted_args))
if args.query_file_path:
file_queries = load_queries_from_test_file(
diff --git a/tests/stress/queries.py b/tests/stress/queries.py
new file mode 100644
index 0000000..62b7038
--- /dev/null
+++ b/tests/stress/queries.py
@@ -0,0 +1,301 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This module implements helpers for representing queries to be executed by the
+# stress test, loading them and generating them.
+
+import logging
+import os
+from textwrap import dedent
+
+from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt
+from tests.comparison.query_generator import QueryGenerator
+from tests.comparison.query_profile import DefaultProfile
+from tests.comparison.model_translator import SqlWriter
+from tests.util.parse_util import match_memory_estimate, parse_mem_to_mb
+import tests.util.test_file_parser as test_file_parser
+
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+
+
+class QueryType(object):
+ COMPUTE_STATS, DELETE, INSERT, SELECT, UPDATE, UPSERT = range(6)
+
+
+class Query(object):
+ """Contains a SQL statement along with expected runtime information.
+ This class is used as a struct, with the fields filled out by calling
+ classes."""
+
+ def __init__(self):
+ self.name = None
+ self.sql = None
+ # In order to be able to make good estimates for DML queries in the binary search,
+ # we need to bring the table to a good initial state before excuting the sql. Running
+ # set_up_sql accomplishes this task.
+ self.set_up_sql = None
+ self.db_name = None
+ self.result_hash = None
+ self.required_mem_mb_with_spilling = None
+ self.required_mem_mb_without_spilling = None
+ self.solo_runtime_profile_with_spilling = None
+ self.solo_runtime_profile_without_spilling = None
+ self.solo_runtime_secs_with_spilling = None
+ self.solo_runtime_secs_without_spilling = None
+ # Query options to set before running the query.
+ self.options = {}
+ # Determines the order in which we will populate query runtime info. Queries with the
+ # lowest population_order property will be handled first.
+ self.population_order = 0
+ # Type of query. Can have the following values: SELECT, COMPUTE_STATS, INSERT, UPDATE,
+ # UPSERT, DELETE.
+ self.query_type = QueryType.SELECT
+
+ self._logical_query_id = None
+
+ def __repr__(self):
+ return dedent("""
+ <Query
+ Mem: %(required_mem_mb_with_spilling)s
+ Mem no-spilling: %(required_mem_mb_without_spilling)s
+ Solo Runtime: %(solo_runtime_secs_with_spilling)s
+ Solo Runtime no-spilling: %(solo_runtime_secs_without_spilling)s
+ DB: %(db_name)s
+ Options: %(options)s
+ Set up SQL: %(set_up_sql)s>
+ SQL: %(sql)s>
+ Population order: %(population_order)r>
+ """.strip() % self.__dict__)
+
+ @property
+ def logical_query_id(self):
+ """
+ Return a meanginful unique str identifier for the query.
+
+ Example: "tpcds_300_decimal_parquet_q21"
+ """
+ if self._logical_query_id is None:
+ self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name)
+ return self._logical_query_id
+
+ def write_runtime_info_profiles(self, directory):
+ """Write profiles for spilling and non-spilling into directory (str)."""
+ profiles_to_write = [
+ (self.logical_query_id + "_profile_with_spilling.txt",
+ self.solo_runtime_profile_with_spilling),
+ (self.logical_query_id + "_profile_without_spilling.txt",
+ self.solo_runtime_profile_without_spilling),
+ ]
+ for filename, profile in profiles_to_write:
+ if profile is None:
+ LOG.debug("No profile recorded for {0}".format(filename))
+ continue
+ with open(os.path.join(directory, filename), "w") as fh:
+ fh.write(profile)
+
+
+def load_tpc_queries(workload):
+ """Returns a list of TPC queries. 'workload' should either be 'tpch' or 'tpcds'."""
+ LOG.info("Loading %s queries", workload)
+ queries = []
+ for query_name, query_sql in test_file_parser.load_tpc_queries(workload,
+ include_stress_queries=True).iteritems():
+ query = Query()
+ query.name = query_name
+ query.sql = query_sql
+ queries.append(query)
+ return queries
+
+
+def load_queries_from_test_file(file_path, db_name=None):
+ LOG.debug("Loading queries from %s", file_path)
+ test_cases = test_file_parser.parse_query_test_file(file_path)
+ queries = list()
+ for test_case in test_cases:
+ query = Query()
+ query.sql = test_file_parser.remove_comments(test_case["QUERY"])
+ query.db_name = db_name
+ queries.append(query)
+ return queries
+
+
+def generate_compute_stats_queries(cursor):
+ """For each table in the database that cursor is connected to, generate several compute
+ stats queries. Each query will have a different value for the MT_DOP query option.
+ """
+ LOG.info("Generating Compute Stats queries")
+ tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+ if not t.endswith("_original")]
+ result = []
+ mt_dop_values = [str(2**k) for k in range(5)]
+ for table in tables:
+ for mt_dop_value in mt_dop_values:
+ compute_query = Query()
+ compute_query.population_order = 1
+ compute_query.query_type = QueryType.COMPUTE_STATS
+ compute_query.sql = "COMPUTE STATS {0}".format(table.name)
+ compute_query.options["MT_DOP"] = mt_dop_value
+ compute_query.db_name = cursor.db_name
+ compute_query.name = "compute_stats_{0}_mt_dop_{1}".format(
+ table.name, compute_query.options["MT_DOP"])
+ result.append(compute_query)
+ LOG.debug("Added compute stats query: {0}".format(compute_query))
+ return result
+
+
+def generate_DML_queries(cursor, dml_mod_values):
+ """Generate insert, upsert, update, delete DML statements.
+
+ For each table in the database that cursor is connected to, create 4 DML queries
+ (insert, upsert, update, delete) for each mod value in 'dml_mod_values'. This value
+ controls which rows will be affected. The generated queries assume that for each table
+ in the database, there exists a table with a '_original' suffix that is never modified.
+
+ This function has some limitations:
+ 1. Only generates DML statements against Kudu tables, and ignores non-Kudu tables.
+ 2. Requires that the type of the first column of the primary key is an integer type.
+ """
+ LOG.info("Generating DML queries")
+ tables = [cursor.describe_table(t) for t in cursor.list_table_names()
+ if not t.endswith("_original")]
+ result = []
+ for table in tables:
+ if not table.primary_keys:
+ # Skip non-Kudu tables. If a table has no primary keys, then it cannot be a Kudu
+ # table.
+ LOG.debug("Skipping table '{0}' because it has no primary keys.".format(table.name))
+ continue
+ if len(table.primary_keys) > 1:
+ # TODO(IMPALA-4665): Add support for tables with multiple primary keys.
+ LOG.debug("Skipping table '{0}' because it has more than "
+ "1 primary key column.".format(table.name))
+ continue
+ primary_key = table.primary_keys[0]
+ if primary_key.exact_type not in (Int, TinyInt, SmallInt, BigInt):
+ # We want to be able to apply the modulo operation on the primary key. If the
+ # the first primary key column happens to not be an integer, we will skip
+ # generating queries for this table
+ LOG.debug("Skipping table '{0}' because the first column '{1}' in the "
+ "primary key is not an integer.".format(table.name, primary_key.name))
+ continue
+ for mod_value in dml_mod_values:
+ # Insert
+ insert_query = Query()
+ # Populate runtime info for Insert and Upsert queries before Update and Delete
+ # queries because tables remain in original state after running the Insert and
+ # Upsert queries. During the binary search in runtime info population for the
+ # Insert query, we first delete some rows and then reinsert them, so the table
+ # remains in the original state. For the delete, the order is reversed, so the table
+ # is not in the original state after running the the delete (or update) query. This
+ # is why population_order is smaller for Insert and Upsert queries.
+ insert_query.population_order = 1
+ insert_query.query_type = QueryType.INSERT
+ insert_query.name = "insert_{0}".format(table.name)
+ insert_query.db_name = cursor.db_name
+ insert_query.sql = (
+ "INSERT INTO TABLE {0} SELECT * FROM {0}_original "
+ "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+ # Upsert
+ upsert_query = Query()
+ upsert_query.population_order = 1
+ upsert_query.query_type = QueryType.UPSERT
+ upsert_query.name = "upsert_{0}".format(table.name)
+ upsert_query.db_name = cursor.db_name
+ upsert_query.sql = (
+ "UPSERT INTO TABLE {0} SELECT * "
+ "FROM {0}_original WHERE {1} % {2} = 0").format(
+ table.name, primary_key.name, mod_value)
+ # Update
+ update_query = Query()
+ update_query.population_order = 2
+ update_query.query_type = QueryType.UPDATE
+ update_query.name = "update_{0}".format(table.name)
+ update_query.db_name = cursor.db_name
+ update_list = ', '.join(
+ 'a.{0} = b.{0}'.format(col.name)
+ for col in table.cols if not col.is_primary_key)
+ update_query.sql = (
+ "UPDATE a SET {update_list} FROM {table_name} a JOIN {table_name}_original b "
+ "ON a.{pk} = b.{pk} + 1 WHERE a.{pk} % {mod_value} = 0").format(
+ table_name=table.name, pk=primary_key.name, mod_value=mod_value,
+ update_list=update_list)
+ # Delete
+ delete_query = Query()
+ delete_query.population_order = 2
+ delete_query.query_type = QueryType.DELETE
+ delete_query.name = "delete_{0}".format(table.name)
+ delete_query.db_name = cursor.db_name
+ delete_query.sql = ("DELETE FROM {0} WHERE {1} % {2} = 0").format(
+ table.name, primary_key.name, mod_value)
+
+ if table.name + "_original" in set(table.name for table in tables):
+ insert_query.set_up_sql = "DELETE FROM {0} WHERE {1} % {2} = 0".format(
+ table.name, primary_key.name, mod_value)
+ upsert_query.set_up_sql = insert_query.set_up_sql
+ update_query.set_up_sql = (
+ "UPSERT INTO TABLE {0} SELECT * FROM {0}_original "
+ "WHERE {1} % {2} = 0").format(table.name, primary_key.name, mod_value)
+ delete_query.set_up_sql = update_query.set_up_sql
+
+ result.append(insert_query)
+ LOG.debug("Added insert query: {0}".format(insert_query))
+ result.append(update_query)
+ LOG.debug("Added update query: {0}".format(update_query))
+ result.append(upsert_query)
+ LOG.debug("Added upsert query: {0}".format(upsert_query))
+ result.append(delete_query)
+ LOG.debug("Added delete query: {0}".format(delete_query))
+ assert len(result) > 0, "No DML queries were added."
+ return result
+
+
+def generate_random_queries(impala, random_db):
+ """Generator function to produce random queries. 'impala' is the Impala service
+ object. random_db is the name of the database that queries should be
+ generated for."""
+ with impala.cursor(db_name=random_db) as cursor:
+ tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
+ query_generator = QueryGenerator(DefaultProfile())
+ model_translator = SqlWriter.create()
+ while True:
+ query_model = query_generator.generate_statement(tables)
+ sql = model_translator.write_query(query_model)
+ query = Query()
+ query.sql = sql
+ query.db_name = random_db
+ yield query
+
+
+def estimate_query_mem_mb_usage(query, impalad_conn):
+ """Runs an explain plan then extracts and returns the estimated memory needed to run
+ the query.
+ """
+ with impalad_conn.cursor() as cursor:
+ LOG.debug("Using %s database", query.db_name)
+ if query.db_name:
+ cursor.execute('USE ' + query.db_name)
+ if query.query_type == QueryType.COMPUTE_STATS:
+ # Running "explain" on compute stats is not supported by Impala.
+ return
+ LOG.debug("Explaining query\n%s", query.sql)
+ cursor.execute('EXPLAIN ' + query.sql)
+ explain_rows = cursor.fetchall()
+ explain_lines = [row[0] for row in explain_rows]
+ mem_limit, units = match_memory_estimate(explain_lines)
+ return parse_mem_to_mb(mem_limit, units)
diff --git a/tests/stress/runtime_info.py b/tests/stress/runtime_info.py
new file mode 100644
index 0000000..6cba385
--- /dev/null
+++ b/tests/stress/runtime_info.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Utility functions used by the stress test to save and load runtime info
+# about queries to and from JSON files.
+
+from collections import defaultdict
+import json
+import logging
+import os
+import sys
+
+from tests.stress.queries import Query
+
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+
+# The version of the file format containing the collected query runtime info.
+RUNTIME_INFO_FILE_VERSION = 3
+
+
+def save_runtime_info(path, query, impala):
+ """Updates the file at 'path' with the given query information."""
+ store = None
+ if os.path.exists(path):
+ with open(path) as file:
+ store = json.load(file)
+ _check_store_version(store)
+ if not store:
+ store = {
+ "host_names": list(), "db_names": dict(), "version": RUNTIME_INFO_FILE_VERSION}
+ with open(path, "w+") as file:
+ store["host_names"] = sorted([i.host_name for i in impala.impalads])
+ queries = store["db_names"].get(query.db_name, dict())
+ query_by_options = queries.get(query.sql, dict())
+ query_by_options[str(sorted(query.options.items()))] = query
+ queries[query.sql] = query_by_options
+ store["db_names"][query.db_name] = queries
+
+ class JsonEncoder(json.JSONEncoder):
+ def default(self, obj):
+ data = dict(obj.__dict__)
+ # Queries are stored by sql, so remove the duplicate data. Also don't store
+ # profiles as JSON values, but instead separately.
+ for k in ("sql", "solo_runtime_profile_with_spilling",
+ "solo_runtime_profile_without_spilling"):
+ if k in data:
+ del data[k]
+ return data
+ json.dump(
+ store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': '))
+
+
+def load_runtime_info(path, impala=None):
+ """Reads the query runtime information at 'path' and returns a
+ dict<db_name, dict<sql, Query>>. Returns an empty dict if the hosts in the 'impala'
+ instance do not match the data in 'path'.
+ """
+ queries_by_db_and_sql = defaultdict(lambda: defaultdict(dict))
+ if not os.path.exists(path):
+ return queries_by_db_and_sql
+ with open(path) as file:
+ store = json.load(file)
+ _check_store_version(store)
+ if (
+ impala and
+ store.get("host_names") != sorted([i.host_name for i in impala.impalads])
+ ):
+ return queries_by_db_and_sql
+ for db_name, queries_by_sql in store["db_names"].iteritems():
+ for sql, queries_by_options in queries_by_sql.iteritems():
+ for options, json_query in queries_by_options.iteritems():
+ query = Query()
+ query.__dict__.update(json_query)
+ query.sql = sql
+ queries_by_db_and_sql[db_name][sql][options] = query
+ return queries_by_db_and_sql
+
+
+def _check_store_version(store):
+ """Clears 'store' if the version is too old or raises an error if the version is too
+ new.
+ """
+ if store["version"] < RUNTIME_INFO_FILE_VERSION:
+ LOG.warn("Runtime file info version is old and will be ignored")
+ store.clear()
+ elif store["version"] > RUNTIME_INFO_FILE_VERSION:
+ raise Exception(
+ "Unexpected runtime file info version %s expected %s"
+ % (store["version"], RUNTIME_INFO_FILE_VERSION))
+
+
+def print_runtime_info_comparison(old_runtime_info, new_runtime_info):
+ # TODO: Provide a way to call this from the CLI. This was hard coded to run from main()
+ # when it was used.
+ print(",".join([
+ "Database", "Query",
+ "Old Mem MB w/Spilling",
+ "New Mem MB w/Spilling",
+ "Diff %",
+ "Old Runtime w/Spilling",
+ "New Runtime w/Spilling",
+ "Diff %",
+ "Old Mem MB wout/Spilling",
+ "New Mem MB wout/Spilling",
+ "Diff %",
+ "Old Runtime wout/Spilling",
+ "New Runtime wout/Spilling",
+ "Diff %"]))
+ for db_name, old_queries in old_runtime_info.iteritems():
+ new_queries = new_runtime_info.get(db_name)
+ if not new_queries:
+ continue
+ for sql, old_query in old_queries.iteritems():
+ new_query = new_queries.get(sql)
+ if not new_query:
+ continue
+ sys.stdout.write(old_query["db_name"])
+ sys.stdout.write(",")
+ sys.stdout.write(old_query["name"])
+ sys.stdout.write(",")
+ for attr in [
+ "required_mem_mb_with_spilling", "solo_runtime_secs_with_spilling",
+ "required_mem_mb_without_spilling", "solo_runtime_secs_without_spilling"
+ ]:
+ old_value = old_query[attr]
+ sys.stdout.write(str(old_value))
+ sys.stdout.write(",")
+ new_value = new_query[attr]
+ sys.stdout.write(str(new_value))
+ sys.stdout.write(",")
+ if old_value and new_value is not None:
+ sys.stdout.write("%0.2f%%" % (100 * float(new_value - old_value) / old_value))
+ else:
+ sys.stdout.write("N/A")
+ sys.stdout.write(",")
+ print()