You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/10/08 17:11:59 UTC
[3/6] impala git commit: IMPALA-7643: report # queries actually
executing in stress test
IMPALA-7643: report # queries actually executing in stress test
With admission control it's interesting to separate out two categories
of queries:
1. Queries that have started up and are executing
2. Queries that have not made it that far yet, e.g. are waiting to
establish a client connection (hitting --fe_service_threads limit),
are in the planner, are queued in admission control or are starting
up.
We now call 1+2 "Active" and 1 "Executing".
Example output:
Done | Active | Executing | Mem Lmt Ex | AC Reject | AC Timeout | Time Out | Cancel | Err | Incorrect | Next Qry Mem Lmt | Tot Qry Mem Lmt | Tracked Mem | RSS Mem
0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | |
0 | 10 | 3 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 510 | 3922 | 158 | 4541
0 | 20 | 10 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 390 | 8534 | 570 | 4517
Refactored QueryRunner.run_query() to reduce nesting and make it more
readable.
Testing:
Ran local stress tests with and without --test_admission_control set.
Change-Id: I5692e8e5ba3224becefc24437197bf5a5b450335
Reviewed-on: http://gerrit.cloudera.org:8080/11587
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/81c58d5d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/81c58d5d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/81c58d5d
Branch: refs/heads/master
Commit: 81c58d5de0d0295b5535ff15afb284bccb6b0026
Parents: d3db326
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Oct 2 17:26:51 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 6 03:13:42 2018 +0000
----------------------------------------------------------------------
tests/stress/concurrent_select.py | 175 ++++++++++++++++++++++-----------
1 file changed, 117 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/81c58d5d/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 80a2386..844c245 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -427,9 +427,9 @@ class StressRunner(object):
self._mem_mb_needed_for_next_query = Value("i", 0)
# This lock provides a way to stop new queries from running. This lock must be
- # acquired before writing to _num_queries_started. Before query submission
- # _num_queries_started must be incremented. Reading _num_queries_started is allowed
- # without taking this lock.
+ # acquired before writing to _num_queries_submitted. Before query submission
+ # _num_queries_submitted must be incremented. Reading _num_queries_submitted is
+ # allowed without taking this lock.
self._submit_query_lock = Lock()
self.leak_check_interval_mins = None
@@ -439,7 +439,11 @@ class StressRunner(object):
# All values below are cumulative.
self._num_queries_dequeued = Value("i", 0)
- self._num_queries_started = Value("i", 0)
+ # The number of queries that were submitted to a query runner.
+ self._num_queries_submitted = Value("i", 0)
+ # The number of queries that have entered the RUNNING state (i.e. got through Impala's
+ # admission control and started executing) or were cancelled or hit an error.
+ self._num_queries_started_running_or_cancelled = Value("i", 0)
self._num_queries_finished = Value("i", 0)
self._num_queries_exceeded_mem_limit = Value("i", 0)
self._num_queries_ac_rejected = Value("i", 0)
@@ -458,9 +462,9 @@ class StressRunner(object):
self.results_dir = gettempdir()
self._status_headers = [
- "Done", "Running", "Mem Lmt Ex", "AC Reject", "AC Timeout", "Time Out", "Cancel",
- "Err", "Incorrect", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem",
- "RSS Mem"]
+ "Done", "Active", "Executing", "Mem Lmt Ex", "AC Reject", "AC Timeout",
+ "Time Out", "Cancel", "Err", "Incorrect", "Next Qry Mem Lmt",
+ "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
self._num_queries_to_run = None
self._query_producer_thread = None
@@ -563,11 +567,11 @@ class StressRunner(object):
def _start_consuming_queries(self, impala):
def start_additional_runners_if_needed():
try:
- while self._num_queries_started.value < self._num_queries_to_run:
+ while self._num_queries_submitted.value < self._num_queries_to_run:
sleep(1.0 / self.startup_queries_per_sec)
# Remember num dequeued/started are cumulative.
with self._submit_query_lock:
- if self._num_queries_dequeued.value != self._num_queries_started.value:
+ if self._num_queries_dequeued.value != self._num_queries_submitted.value:
# Assume dequeued queries are stuck waiting for cluster resources so there
# is no point in starting an additional runner.
continue
@@ -597,10 +601,10 @@ class StressRunner(object):
# while no queries were running.
ready_to_unlock = None
try:
- while self._num_queries_started.value < self._num_queries_to_run:
+ while self._num_queries_submitted.value < self._num_queries_to_run:
if ready_to_unlock:
assert query_sumbission_is_locked, "Query submission not yet locked"
- assert not self._num_queries_running, "Queries are still running"
+ assert not self._num_queries_active, "Queries are still running"
LOG.debug("Resuming query submission")
self._next_leak_check_unix_time.value = int(
time() + 60 * self.leak_check_interval_mins)
@@ -613,7 +617,7 @@ class StressRunner(object):
self.leak_check_interval_mins and
time() > self._next_leak_check_unix_time.value
):
- assert self._num_queries_running <= len(self._query_runners), \
+ assert self._num_queries_active <= len(self._query_runners), \
"Each running query should belong to a runner"
LOG.debug("Stopping query submission")
self._submit_query_lock.acquire()
@@ -638,7 +642,7 @@ class StressRunner(object):
max_actual = -1
self._set_mem_usage_values(max_reported, max_actual)
- if query_sumbission_is_locked and not self._num_queries_running:
+ if query_sumbission_is_locked and not self._num_queries_active:
if ready_to_unlock is None:
ready_to_unlock = False
else:
@@ -670,17 +674,33 @@ class StressRunner(object):
self._max_mem_mb_usage.value = actual
@property
- def _num_queries_running(self):
- num_running = self._num_queries_started.value - self._num_queries_finished.value
+ def _num_queries_active(self):
+ """The number of queries that are currently active (i.e. submitted to a query runner
+ and haven't yet completed)."""
+ num_running = self._num_queries_submitted.value - self._num_queries_finished.value
assert num_running >= 0, "The number of running queries is negative"
return num_running
+ @property
+ def _num_queries_executing(self):
+ """The number of queries that are currently executing (i.e. entered the RUNNING state
+ and haven't yet completed)."""
+ num_executing = (self._num_queries_started_running_or_cancelled.value -
+ self._num_queries_finished.value)
+ assert num_executing >= 0, "The number of executing queries is negative"
+ return num_executing
+
+ def increment_num_queries_started_running_or_cancelled(self):
+ """Called by query runner to increment _num_queries_started_running_or_cancelled."""
+ increment(self._num_queries_started_running_or_cancelled)
+
+
def _start_single_runner(self, impalad):
"""Consumer function to take a query of the queue and run it. This is intended to
run in a separate process so validating the result set can use a full CPU.
"""
LOG.debug("New query runner started")
- runner = QueryRunner()
+ runner = QueryRunner(self)
runner.impalad = impalad
runner.results_dir = self.results_dir
runner.use_kerberos = self.use_kerberos
@@ -714,7 +734,7 @@ class StressRunner(object):
solo_runtime = query.solo_runtime_secs_with_spilling
LOG.debug("Waiting for other query runners to start their queries")
- while query_idx > self._num_queries_started.value:
+ while query_idx > self._num_queries_submitted.value:
sleep(0.1)
self._mem_mb_needed_for_next_query.value = mem_limit
@@ -723,13 +743,13 @@ class StressRunner(object):
with self._mem_broker.reserve_mem_mb(mem_limit) as reservation_id:
LOG.debug("Received memory reservation")
with self._submit_query_lock:
- increment(self._num_queries_started)
+ increment(self._num_queries_submitted)
should_cancel = self.cancel_probability > random()
if should_cancel:
timeout = randrange(1, max(int(solo_runtime), 2))
else:
timeout = solo_runtime * max(
- 10, self._num_queries_started.value - self._num_queries_finished.value)
+ 10, self._num_queries_submitted.value - self._num_queries_finished.value)
report = runner.run_query(query, timeout, mem_limit, should_cancel=should_cancel)
LOG.debug("Got execution report for query")
if report.timed_out and should_cancel:
@@ -811,8 +831,10 @@ class StressRunner(object):
print(status_format % (
# Done
self._num_queries_finished.value,
- # Running
- self._num_queries_started.value - self._num_queries_finished.value,
+ # Active
+ self._num_queries_active,
+ # Executing
+ self._num_queries_executing,
# Mem Lmt Ex
self._num_queries_exceeded_mem_limit.value,
# AC Rejected
@@ -1010,7 +1032,11 @@ class QueryRunner(object):
SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
BATCH_SIZE = 1024
- def __init__(self):
+ def __init__(self, stress_runner=None):
+ """Creates a new instance. The caller must fill in the below fields. stress_runner
+ must be provided if this is running in the context of a stress run, so that statistics
+ can be updated."""
+ self.stress_runner = stress_runner
self.impalad = None
self.impalad_conn = None
self.use_kerberos = False
@@ -1043,29 +1069,7 @@ class QueryRunner(object):
try:
with self.impalad_conn.cursor() as cursor:
start_time = time()
- if query.db_name:
- LOG.debug("Using %s database", query.db_name)
- cursor.execute("USE %s" % query.db_name)
- if run_set_up and query.set_up_sql:
- LOG.debug("Running set up query:\n%s", self.set_up_sql)
- cursor.execute(query.set_up_sql)
- for query_option, value in self.common_query_options.iteritems():
- cursor.execute(
- "SET {query_option}={value}".format(query_option=query_option, value=value))
- for query_option, value in query.options.iteritems():
- cursor.execute(
- "SET {query_option}={value}".format(query_option=query_option, value=value))
- cursor.execute("SET ABORT_ON_ERROR=1")
- if self.test_admission_control:
- LOG.debug(
- "Running query without mem limit at %s with timeout secs %s:\n%s",
- self.impalad.host_name, timeout_secs, query.sql)
- else:
- LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
- cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
- LOG.debug(
- "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
- mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+ self._set_db_and_options(cursor, query, run_set_up, mem_limit_mb, timeout_secs)
error = None
try:
cursor.execute_async(
@@ -1074,19 +1078,10 @@ class QueryRunner(object):
report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
cursor._last_operation else None)
LOG.debug("Query id is %s", report.query_id)
- sleep_secs = 0.1
- secs_since_log = 0
- while cursor.is_executing():
- if time() > timeout_unix_time:
- if not should_cancel:
- fetch_and_set_profile(cursor, report)
- self._cancel(cursor, report)
- return report
- if secs_since_log > 5:
- secs_since_log = 0
- LOG.debug("Waiting for query to execute")
- sleep(sleep_secs)
- secs_since_log += sleep_secs
+ if not self._wait_until_fetchable(cursor, report, timeout_unix_time,
+ should_cancel):
+ return report
+
if query.query_type == QueryType.SELECT:
try:
report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
@@ -1118,6 +1113,70 @@ class QueryRunner(object):
report.other_error = error
return report
+ def _set_db_and_options(self, cursor, query, run_set_up, mem_limit_mb, timeout_secs):
+ """Set up a new cursor for running a query by switching to the correct database and
+ setting query options."""
+ if query.db_name:
+ LOG.debug("Using %s database", query.db_name)
+ cursor.execute("USE %s" % query.db_name)
+ if run_set_up and query.set_up_sql:
+ LOG.debug("Running set up query:\n%s", query.set_up_sql)
+ cursor.execute(query.set_up_sql)
+ for query_option, value in self.common_query_options.iteritems():
+ cursor.execute(
+ "SET {query_option}={value}".format(query_option=query_option, value=value))
+ for query_option, value in query.options.iteritems():
+ cursor.execute(
+ "SET {query_option}={value}".format(query_option=query_option, value=value))
+ cursor.execute("SET ABORT_ON_ERROR=1")
+ if self.test_admission_control:
+ LOG.debug(
+ "Running query without mem limit at %s with timeout secs %s:\n%s",
+ self.impalad.host_name, timeout_secs, query.sql)
+ else:
+ LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
+ cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
+ LOG.debug(
+ "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+ mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+
+ def _wait_until_fetchable(self, cursor, report, timeout_unix_time, should_cancel):
+ """Wait up until timeout_unix_time until the query results can be fetched (if it's
+ a SELECT query) or until it has finished executing (if it's a different query type
+ like DML). If the timeout expires we either cancel the query or report the timeout.
+ Return True in the first case or False in the second (timeout) case."""
+ # Loop until the query gets to the right state or a timeout expires.
+ sleep_secs = 0.1
+ secs_since_log = 0
+ # True if we incremented num_queries_started_running_or_cancelled for this query.
+ started_running_or_cancelled = False
+ while True:
+ query_state = cursor.status()
+ # Check if the query got past the PENDING/INITIALIZED states, either because
+ # it's executing or hit an error.
+ if (not started_running_or_cancelled and query_state not in ('PENDING_STATE',
+ 'INITIALIZED_STATE')):
+ started_running_or_cancelled = True
+ if self.stress_runner:
+ self.stress_runner.increment_num_queries_started_running_or_cancelled()
+ # Return if we're ready to fetch results (in the FINISHED state) or we are in
+ # another terminal state like EXCEPTION.
+ if query_state not in ('PENDING_STATE', 'INITIALIZED_STATE', 'RUNNING_STATE'):
+ return True
+
+ if time() > timeout_unix_time:
+ if not should_cancel:
+ fetch_and_set_profile(cursor, report)
+ self._cancel(cursor, report)
+ if not started_running_or_cancelled and self.stress_runner:
+ self.stress_runner.increment_num_queries_started_running_or_cancelled()
+ return False
+ if secs_since_log > 5:
+ secs_since_log = 0
+ LOG.debug("Waiting for query to execute")
+ sleep(sleep_secs)
+ secs_since_log += sleep_secs
+
def _cancel(self, cursor, report):
report.timed_out = True