You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/21 05:00:47 UTC

[impala] 01/03: IMPALA-6662: Make stress test resilient to hangs due to client crashes

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 95414528199011716af0c55ac9c11eb69fb442b7
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Feb 19 00:33:23 2019 -0800

    IMPALA-6662: Make stress test resilient to hangs due to client crashes
    
    Thanks to Sailesh Mukil for the initial version of this patch.
    
    The concurrent_select.py process starts multiple sub processes
    (called query runners), to run the queries. It also starts 2 threads
    called the query producer thread and the query consumer thread. The
    query producer thread adds queries to a query queue and the query
    consumer thread pulls off the queue and feeds the queries to the
    query runners.
    
    The query runner, once it gets queries, does the following:
    ...
      with _submit_query_lock:
        increment(num_queries_started)
      run_query()    # One runner crashes here.
      increment(num_queries_finished)
    ...
    
    One of the runners crash inside run_query(), thereby never incrementing
    num_queries_finished.
    
    Another thread that's supposed to check for memory leaks
    (but actually doesn't), periodically acquires '_submit_query_lock' and
    waits for the number of running queries to reach 0 before releasing the
    lock.
    
    However, in the above case, the number of running queries will never
    reach 0 because one of the query runners hasn't incremented
    'num_queries_finished' and exited. Therefore, the poll_mem_usage()
    function will hold the lock indefinitely, causing no new queries to be
    submitted, nor the stress test to complete running.
    
    This patch fixes the problem by changing the global trackers of
    num_queries_started and num_queries_finished, etc. to a per
    QueryRunner basis. Anytime we want to find the total number of queries
    started/finished/cancelled, etc., we aggregate the values from all the
    runners. We synchronize access by adding a new lock called the
    _query_runners_lock.
    
    In _wait_for_test_to_finish(), we periodically check if a QueryRunner has
    died, and if it has, we make sure to update the num_queries_finished to
    num_queries_started, since it may have died before updating the 'finished'
    value, and we also count the error.
    
    Other changes:
    * Boilerplate code is reduced by storing all metrics in a dictionary
      keyed by the metric name, instead of stamping out the code for
      10+ variables.
    * Added more comments and debug strings
    * Reformatted some code.
    
    Testing:
    Ran the stress test with the new patch locally and against a cluster.
    
    Change-Id: I525bf13e0f3dd660c0d9f5c2bf6eb292e7ebb8af
    Reviewed-on: http://gerrit.cloudera.org:8080/12521
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/stress/concurrent_select.py | 337 +++++++++++++++++++++++++-------------
 1 file changed, 222 insertions(+), 115 deletions(-)

diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 00004a3..e352d00 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -69,6 +69,7 @@ from Queue import Empty   # Must be before Queue below
 from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace, SUPPRESS
 from collections import defaultdict
 from contextlib import contextmanager
+from copy import copy
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
 from random import choice, random, randrange, shuffle
@@ -98,6 +99,22 @@ RESULT_HASHES_DIR = "result_hashes"
 # The version of the file format containing the collected query runtime info.
 RUNTIME_INFO_FILE_VERSION = 3
 
+# Metrics collected during the stress running process.
+NUM_QUERIES_DEQUEUED = "num_queries_dequeued"
+# The number of queries that were submitted to a query runner.
+NUM_QUERIES_SUBMITTED = "num_queries_submitted"
+# The number of queries that have entered the RUNNING state (i.e. got through Impala's
+# admission control and started executing) or were cancelled or hit an error.
+NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED = "num_queries_started_running_or_cancelled"
+NUM_QUERIES_FINISHED = "num_queries_finished"
+NUM_QUERIES_EXCEEDED_MEM_LIMIT = "num_queries_exceeded_mem_limit"
+NUM_QUERIES_AC_REJECTED = "num_queries_ac_rejected"
+NUM_QUERIES_AC_TIMEDOUT = "num_queries_ac_timedout"
+NUM_QUERIES_CANCELLED = "num_queries_cancelled"
+NUM_QUERIES_TIMEDOUT = "num_queries_timedout"
+NUM_RESULT_MISMATCHES = "num_result_mismatches"
+NUM_OTHER_ERRORS = "num_other_errors"
+
 
 class StressArgConverter(object):
   def __init__(self, args):
@@ -427,8 +444,8 @@ class StressRunner(object):
     self._mem_mb_needed_for_next_query = Value("i", 0)
 
     # This lock provides a way to stop new queries from running. This lock must be
-    # acquired before writing to _num_queries_submitted. Before query submission
-    # _num_queries_submitted must be incremented. Reading _num_queries_submitted is
+    # acquired before writing to the NUM_QUERIES_SUBMITTED metric for the query_runner,
+    # which is incremented before every query submission.Reading NUM_QUERIES_SUBMITTED is
     # allowed without taking this lock.
     self._submit_query_lock = Lock()
 
@@ -437,22 +454,6 @@ class StressRunner(object):
     self._max_mem_mb_reported_usage = Value("i", -1)   # -1 => Unknown
     self._max_mem_mb_usage = Value("i", -1)   # -1 => Unknown
 
-    # All values below are cumulative.
-    self._num_queries_dequeued = Value("i", 0)
-    # The number of queries that were submitted to a query runner.
-    self._num_queries_submitted = Value("i", 0)
-    # The number of queries that have entered the RUNNING state (i.e. got through Impala's
-    # admission control and started executing) or were cancelled or hit an error.
-    self._num_queries_started_running_or_cancelled = Value("i", 0)
-    self._num_queries_finished = Value("i", 0)
-    self._num_queries_exceeded_mem_limit = Value("i", 0)
-    self._num_queries_ac_rejected = Value("i", 0)
-    self._num_queries_ac_timedout = Value("i", 0)
-    self._num_queries_cancelled = Value("i", 0)
-    self._num_queries_timedout = Value("i", 0)
-    self._num_result_mismatches = Value("i", 0)
-    self._num_other_errors = Value("i", 0)
-
     self.cancel_probability = 0
     self.spill_probability = 0
 
@@ -468,10 +469,65 @@ class StressRunner(object):
 
     self._num_queries_to_run = None
     self._query_producer_thread = None
-    self._query_runners = list()
+
+    # This lock is used to synchronize access to the '_query_runners' list and also to all
+    # the '_past_runners*' members.
+    self._query_runners_lock = Lock()
+    self._query_runners = []
+
+    # These are the cumulative values of all the queries that have started/finished/-
+    # dequeued, etc. on runners that have already died. Every time we notice that a query
+    # runner has died, we update these values.
+    self._past_runner_metrics = defaultdict(lambda: 0)
+
     self._query_consumer_thread = None
     self._mem_polling_thread = None
 
+  def _record_runner_metrics_before_evict(self, query_runner):
+    """ Before removing 'query_runner' from the self._query_runners list, record its
+        metrics. Must only be called if 'query_runner' is to be removed from the list.
+        MUST hold '_query_runners_lock' before calling.
+    """
+    for key, synchronized_val in query_runner._metrics.iteritems():
+      self._past_runner_metrics[key] += synchronized_val.value
+
+  def _calc_total_runner_metrics(self):
+    """ Calculate the total of metrics across past and active query runners. """
+    totals = copy(self._past_runner_metrics)
+    with self._query_runners_lock:
+      for query_runner in self._query_runners:
+        for key, synchronized_val in query_runner._metrics.iteritems():
+          totals[key] += synchronized_val.value
+    return totals
+
+  def _calc_total_runner_metric(self, key):
+    """ Calculate the total of metric 'key' across past and active query runners. """
+    with self._query_runners_lock:
+      return self._calc_total_runner_metric_no_lock(key)
+
+  def _calc_total_runner_metric_no_lock(self, key):
+    """ TODO: Get rid of this function after reformatting how we obtain query indices.
+        _query_runners_lock MUST be taken before calling this function.
+    """
+    total = self._past_runner_metrics[key]
+    for runner in self._query_runners:
+      total += runner._metrics[key].value
+    return total
+
+  def _total_num_queries_submitted(self):
+    return self._calc_total_runner_metric(NUM_QUERIES_SUBMITTED)
+
+  def _total_num_queries_active(self):
+    """The number of queries that are currently active (i.e. submitted to a query runner
+    and haven't yet completed)."""
+    metrics = self._calc_total_runner_metrics()
+    num_running = metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED]
+    assert num_running >= 0, "The number of running queries is negative"
+    return num_running
+
+  def _num_runners_remaining(self):
+    return len(self._query_runners)
+
   def run_queries(
       self, queries, impala, num_queries_to_run, mem_overcommit_pct, should_print_status,
       verify_results, select_probability
@@ -561,17 +617,26 @@ class StressRunner(object):
         LOG.error("Error producing queries: %s", e)
         current_thread().error = e
         raise e
+      LOG.info("Producing thread completed job. Exiting...")
     self._query_producer_thread = create_and_start_daemon_thread(
         enqueue_queries, "Query Producer")
 
   def _start_consuming_queries(self, impala):
     def start_additional_runners_if_needed():
       try:
-        while self._num_queries_submitted.value < self._num_queries_to_run:
+        while self._total_num_queries_submitted() < self._num_queries_to_run:
+          # TODO: sleeping for the below amount leads to slower submission than the goal,
+          # because it does not factor in the time spent by this thread outside of the
+          # sleep() call.
           sleep(1.0 / self.startup_queries_per_sec)
           # Remember num dequeued/started are cumulative.
           with self._submit_query_lock:
-            if self._num_queries_dequeued.value != self._num_queries_submitted.value:
+            metrics = self._calc_total_runner_metrics()
+            num_dequeued = metrics[NUM_QUERIES_DEQUEUED]
+            num_submitted = metrics[NUM_QUERIES_SUBMITTED]
+            LOG.debug("Submitted {0} queries. Dequeued {1} queries".format(
+                num_submitted, num_dequeued))
+            if num_dequeued != num_submitted:
               # Assume dequeued queries are stuck waiting for cluster resources so there
               # is no point in starting an additional runner.
               continue
@@ -579,10 +644,21 @@ class StressRunner(object):
           if self.max_coordinators > 0:
             num_coordinators = min(num_coordinators, self.max_coordinators)
           impalad = impala.impalads[len(self._query_runners) % num_coordinators]
-          runner = Process(target=self._start_single_runner, args=(impalad, ))
-          runner.daemon = True
-          self._query_runners.append(runner)
-          runner.start()
+
+          query_runner = QueryRunner()
+          query_runner.impalad = impalad
+          query_runner.results_dir = self.results_dir
+          query_runner.use_kerberos = self.use_kerberos
+          query_runner.common_query_options = self.common_query_options
+          query_runner.test_admission_control = self.test_admission_control
+          query_runner.proc = \
+              Process(target=self._start_single_runner, args=(query_runner, ))
+          query_runner.proc.daemon = True
+          with self._query_runners_lock:
+            self._query_runners.append(query_runner)
+          query_runner.proc.start()
+
+        LOG.info("Consuming thread completed job. Exiting...")
       except Exception as e:
         LOG.error("Error consuming queries: %s", e)
         current_thread().error = e
@@ -601,10 +677,10 @@ class StressRunner(object):
       # while no queries were running.
       ready_to_unlock = None
       try:
-        while self._num_queries_submitted.value < self._num_queries_to_run:
+        while self._total_num_queries_submitted() < self._num_queries_to_run:
           if ready_to_unlock:
             assert query_submission_is_locked, "Query submission not yet locked"
-            assert not self._num_queries_active, "Queries are still running"
+            assert not self._total_num_queries_active(), "Queries are still running"
             LOG.debug("Resuming query submission")
             self._next_leak_check_unix_time.value = int(
                 time() + 60 * self.leak_check_interval_mins)
@@ -617,7 +693,7 @@ class StressRunner(object):
               self.leak_check_interval_mins and
               time() > self._next_leak_check_unix_time.value
           ):
-            assert self._num_queries_active <= len(self._query_runners), \
+            assert self._total_num_queries_active() <= self._num_runners_remaining(), \
                 "Each running query should belong to a runner"
             LOG.debug("Stopping query submission")
             self._submit_query_lock.acquire()
@@ -642,7 +718,7 @@ class StressRunner(object):
             max_actual = -1
           self._set_mem_usage_values(max_reported, max_actual)
 
-          if query_submission_is_locked and not self._num_queries_active:
+          if query_submission_is_locked and not self._total_num_queries_active():
             if ready_to_unlock is None:
               ready_to_unlock = False
             else:
@@ -673,40 +749,15 @@ class StressRunner(object):
         self._max_mem_mb_reported_usage.value = reported
         self._max_mem_mb_usage.value = actual
 
-  @property
-  def _num_queries_active(self):
-    """The number of queries that are currently active (i.e. submitted to a query runner
-    and haven't yet completed)."""
-    num_running = self._num_queries_submitted.value - self._num_queries_finished.value
-    assert num_running >= 0, "The number of running queries is negative"
-    return num_running
-
-  @property
-  def _num_queries_executing(self):
-    """The number of queries that are currently executing (i.e. entered the RUNNING state
-    and haven't yet completed)."""
-    num_executing = (self._num_queries_started_running_or_cancelled.value -
-        self._num_queries_finished.value)
-    assert num_executing >= 0, "The number of executing queries is negative"
-    return num_executing
-
-  def increment_num_queries_started_running_or_cancelled(self):
-    """Called by query runner to increment _num_queries_started_running_or_cancelled."""
-    increment(self._num_queries_started_running_or_cancelled)
-
-
-  def _start_single_runner(self, impalad):
+  def _start_single_runner(self, query_runner):
     """Consumer function to take a query of the queue and run it. This is intended to
     run in a separate process so validating the result set can use a full CPU.
     """
     LOG.debug("New query runner started")
-    runner = QueryRunner(self)
-    runner.impalad = impalad
-    runner.results_dir = self.results_dir
-    runner.use_kerberos = self.use_kerberos
-    runner.common_query_options = self.common_query_options
-    runner.test_admission_control = self.test_admission_control
-    runner.connect()
+
+    # The query runner should already be set up. We just need to connect() before using
+    # the runner.
+    query_runner.connect()
 
     while not self._query_queue.empty():
       try:
@@ -717,9 +768,10 @@ class StressRunner(object):
         LOG.debug("Query running aborting due to closed query queue")
         break
       LOG.debug("Getting query_idx")
-      with self._num_queries_dequeued.get_lock():
-        query_idx = self._num_queries_dequeued.value
-        self._num_queries_dequeued.value += 1
+      with self._query_runners_lock:
+        query_idx = self._calc_total_runner_metric_no_lock(NUM_QUERIES_DEQUEUED)
+        increment(query_runner._metrics[NUM_QUERIES_DEQUEUED])
+        LOG.debug("Query_idx: {0} | PID: {1}".format(query_idx, query_runner.proc.pid))
 
       if not query.required_mem_mb_without_spilling:
         mem_limit = query.required_mem_mb_with_spilling
@@ -734,7 +786,7 @@ class StressRunner(object):
         solo_runtime = query.solo_runtime_secs_with_spilling
 
       LOG.debug("Waiting for other query runners to start their queries")
-      while query_idx > self._num_queries_submitted.value:
+      while query_idx > self._total_num_queries_submitted():
         sleep(0.1)
 
       self._mem_mb_needed_for_next_query.value = mem_limit
@@ -743,18 +795,21 @@ class StressRunner(object):
       with self._mem_broker.reserve_mem_mb(mem_limit) as reservation_id:
         LOG.debug("Received memory reservation")
         with self._submit_query_lock:
-          increment(self._num_queries_submitted)
+          increment(query_runner._metrics[NUM_QUERIES_SUBMITTED])
+
         should_cancel = self.cancel_probability > random()
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
+          metrics = self._calc_total_runner_metrics()
           timeout = solo_runtime * max(
-              10, self._num_queries_submitted.value - self._num_queries_finished.value)
-        report = runner.run_query(query, timeout, mem_limit, should_cancel=should_cancel)
+              10, metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED])
+        report = query_runner.run_query(query, timeout, mem_limit,
+            should_cancel=should_cancel)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
           report.was_cancelled = True
-        self._update_from_query_report(report)
+        query_runner.update_from_query_report(report)
         if report.other_error:
           error_msg = str(report.other_error)
           # There is a possible race during cancellation. If a fetch request fails (for
@@ -783,7 +838,7 @@ class StressRunner(object):
             self._num_successive_errors.value = 0
             continue
           increment(self._num_successive_errors)
-          increment(self._num_other_errors)
+          increment(query_runner._metrics[NUM_OTHER_ERRORS])
           self._write_query_profile(report, PROFILES_DIR, prefix='error')
           raise Exception("Query {query} ID {id} failed: {mesg}".format(
               query=query.logical_query_id,
@@ -803,7 +858,7 @@ class StressRunner(object):
             (self._verify_results and report.result_hash != query.result_hash)
         ):
           increment(self._num_successive_errors)
-          increment(self._num_result_mismatches)
+          increment(query_runner._metrics[NUM_RESULT_MISMATCHES])
           self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results')
           raise Exception(dedent("""\
                                  Result hash mismatch; expected {expected}, got {actual}
@@ -818,6 +873,7 @@ class StressRunner(object):
               "Query {query} unexpectedly timed out. Query ID: {id}".format(
                   query=query.logical_query_id, id=report.query_id))
         self._num_successive_errors.value = 0
+    LOG.debug("Query runner completed...")
 
   def _print_status_header(self):
     print(" | ".join(self._status_headers))
@@ -826,29 +882,31 @@ class StressRunner(object):
     if print_header:
       self._print_status_header()
 
+    metrics = self._calc_total_runner_metrics()
     reported_mem, actual_mem = self._get_mem_usage_values(reset=True)
     status_format = " | ".join(["%%%ss" % len(header) for header in self._status_headers])
     print(status_format % (
         # Done
-        self._num_queries_finished.value,
+        metrics[NUM_QUERIES_FINISHED],
         # Active
-        self._num_queries_active,
+        metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED],
         # Executing
-        self._num_queries_executing,
+        metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED] -
+        metrics[NUM_QUERIES_FINISHED],
         # Mem Lmt Ex
-        self._num_queries_exceeded_mem_limit.value,
+        metrics[NUM_QUERIES_EXCEEDED_MEM_LIMIT],
         # AC Rejected
-        self._num_queries_ac_rejected.value,
+        metrics[NUM_QUERIES_AC_REJECTED],
         # AC Timed Out
-        self._num_queries_ac_timedout.value,
+        metrics[NUM_QUERIES_AC_TIMEDOUT],
         # Time Out
-        self._num_queries_timedout.value - self._num_queries_cancelled.value,
+        metrics[NUM_QUERIES_TIMEDOUT] - metrics[NUM_QUERIES_CANCELLED],
         # Cancel
-        self._num_queries_cancelled.value,
+        metrics[NUM_QUERIES_CANCELLED],
         # Err
-        self._num_other_errors.value,
+        metrics[NUM_OTHER_ERRORS],
         # Incorrect
-        self._num_result_mismatches.value,
+        metrics[NUM_RESULT_MISMATCHES],
         # Next Qry Mem Lmt
         self._mem_mb_needed_for_next_query.value,
         # Total Qry Mem Lmt
@@ -858,19 +916,6 @@ class StressRunner(object):
         # RSS Mem
         "" if actual_mem == -1 else actual_mem))
 
-  def _update_from_query_report(self, report):
-    LOG.debug("Updating runtime stats")
-    increment(self._num_queries_finished)
-    if report.not_enough_memory:
-      increment(self._num_queries_exceeded_mem_limit)
-    if report.ac_rejected:
-      increment(self._num_queries_ac_rejected)
-    if report.ac_timedout:
-      increment(self._num_queries_ac_timedout)
-    if report.was_cancelled:
-      increment(self._num_queries_cancelled)
-    if report.timed_out:
-      increment(self._num_queries_timedout)
 
   def _write_query_profile(self, report, subdir, prefix=None):
     report.write_query_profile(
@@ -886,10 +931,10 @@ class StressRunner(object):
       sys.exit(1)
 
   def _check_for_test_failure(self):
+    metrics = self._calc_total_runner_metrics()
     if (
-        self._num_other_errors.value > 0 or
-        self._num_result_mismatches.value > 0 or
-        self._num_queries_timedout.value - self._num_queries_cancelled.value > 0
+        metrics[NUM_OTHER_ERRORS] > 0 or metrics[NUM_RESULT_MISMATCHES] > 0 or
+        metrics[NUM_QUERIES_TIMEDOUT] - metrics[NUM_QUERIES_CANCELLED] > 0
     ):
       LOG.error("Failing the stress test due to unexpected errors, incorrect results, or "
                 "timed out queries. See the report line above for details.")
@@ -901,10 +946,11 @@ class StressRunner(object):
     lines_printed = 1
     sleep_secs = 0.1
 
+    num_runners_remaining = self._num_runners_remaining()
     while (
         self._query_producer_thread.is_alive() or
         self._query_consumer_thread.is_alive() or
-        self._query_runners
+        num_runners_remaining
     ):
       if self._query_producer_thread.error or self._query_consumer_thread.error:
         # This is bad enough to abort early. A failure here probably means there's a
@@ -912,32 +958,64 @@ class StressRunner(object):
         # not critical so is ignored.
         LOG.error("Aborting due to error in producer/consumer")
         sys.exit(1)
-      checked_for_crashes = False
-      for idx, runner in enumerate(self._query_runners):
-        if runner.exitcode is not None:
-          if runner.exitcode != 0:
-            if not checked_for_crashes:
-              LOG.info("Checking for crashes")
-              if print_crash_info_if_exists(impala, self.start_time):
-                self.print_duration()
-                sys.exit(runner.exitcode)
-              LOG.info("No crashes detected")
-              checked_for_crashes = True
-            self._check_successive_errors()
-          del self._query_runners[idx]
+      do_check_for_impala_crashes = False
+      with self._query_runners_lock:
+        for idx, runner in enumerate(self._query_runners):
+          if runner.proc.exitcode is not None:
+            if runner.proc.exitcode != 0:
+              # Since at least one query runner process failed, make sure to check for
+              # crashed impalads.
+              do_check_for_impala_crashes = True
+              # TODO: Handle case for num_queries_dequeued != num_queries_submitted
+              num_submitted = runner._metrics[NUM_QUERIES_SUBMITTED].value
+              num_finished = runner._metrics[NUM_QUERIES_FINISHED].value
+              if num_submitted != num_finished:
+                # The query runner process may have crashed before updating the number
+                # of finished queries but after it incremented the number of queries
+                # submitted.
+                assert num_submitted - num_finished == 1
+                increment(runner._metrics[NUM_QUERIES_FINISHED])
+                # Since we know that the runner crashed while trying to run a query, we
+                # count it as an 'other error'
+                increment(runner._metrics[NUM_OTHER_ERRORS])
+              self._check_successive_errors()
+            assert runner._metrics[NUM_QUERIES_SUBMITTED].value == \
+                    runner._metrics[NUM_QUERIES_FINISHED].value, \
+                    str([(k, v.value) for k, v in runner._metrics.iteritems()])
+            # Make sure to record all the metrics before removing this runner from the
+            # list.
+            print("Query runner ({0}) exited with exit code {1}".format(
+                runner.proc.pid, runner.proc.exitcode))
+            self._record_runner_metrics_before_evict(self._query_runners[idx])
+
+            # Remove the query runner from the list.
+            del self._query_runners[idx]
+
+      if do_check_for_impala_crashes:
+        # Since we know that at least one query runner failed, check if any of the Impala
+        # daemons themselves crashed.
+        LOG.info("Checking for Impala crashes")
+        if print_crash_info_if_exists(impala, self.start_time):
+          self.print_duration()
+          sys.exit(runner.proc.exitcode)
+        do_check_for_impala_crashes = False
+        LOG.info("No Impala crashes detected")
+
       sleep(sleep_secs)
+      num_runners_remaining = self._num_runners_remaining()
+
       if should_print_status:
         last_report_secs += sleep_secs
         if last_report_secs > 5:
           if (
               not self._query_producer_thread.is_alive() or
               not self._query_consumer_thread.is_alive() or
-              not self._query_runners
+              not num_runners_remaining
           ):
             LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive())
             LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive())
             LOG.debug("Queue size: %s" % self._query_queue.qsize())
-            LOG.debug("Runners: %s" % len(self._query_runners))
+            LOG.debug("Runners: %s" % num_runners_remaining)
           last_report_secs = 0
           lines_printed %= 50
           self._print_status(print_header=(lines_printed == 0))
@@ -1043,6 +1121,22 @@ class QueryRunner(object):
     self.results_dir = gettempdir()
     self.check_if_mem_was_spilled = False
     self.common_query_options = {}
+    self.proc = None
+
+    # All these values are shared values between processes. We want these to be accessible
+    # by the parent process that started this QueryRunner, for operational purposes.
+    self._metrics = {
+        NUM_QUERIES_DEQUEUED: Value("i", 0),
+        NUM_QUERIES_SUBMITTED: Value("i", 0),
+        NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED: Value("i", 0),
+        NUM_QUERIES_FINISHED: Value("i", 0),
+        NUM_QUERIES_EXCEEDED_MEM_LIMIT: Value("i", 0),
+        NUM_QUERIES_AC_REJECTED: Value("i", 0),
+        NUM_QUERIES_AC_TIMEDOUT: Value("i", 0),
+        NUM_QUERIES_CANCELLED: Value("i", 0),
+        NUM_QUERIES_TIMEDOUT: Value("i", 0),
+        NUM_RESULT_MISMATCHES: Value("i", 0),
+        NUM_OTHER_ERRORS: Value("i", 0)}
 
   def connect(self):
     self.impalad_conn = self.impalad.impala.connect(impalad=self.impalad)
@@ -1157,8 +1251,7 @@ class QueryRunner(object):
       if (not started_running_or_cancelled and query_state not in ('PENDING_STATE',
                                                       'INITIALIZED_STATE')):
         started_running_or_cancelled = True
-        if self.stress_runner:
-          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+        increment(self._metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED])
       # Return if we're ready to fetch results (in the FINISHED state) or we are in
       # another terminal state like EXCEPTION.
       if query_state not in ('PENDING_STATE', 'INITIALIZED_STATE', 'RUNNING_STATE'):
@@ -1168,8 +1261,8 @@ class QueryRunner(object):
         if not should_cancel:
           fetch_and_set_profile(cursor, report)
         self._cancel(cursor, report)
-        if not started_running_or_cancelled and self.stress_runner:
-          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+        if not started_running_or_cancelled:
+          increment(self._metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED])
         return False
       if secs_since_log > 5:
         secs_since_log = 0
@@ -1177,6 +1270,20 @@ class QueryRunner(object):
       sleep(sleep_secs)
       secs_since_log += sleep_secs
 
+  def update_from_query_report(self, report):
+    LOG.debug("Updating runtime stats (Query Runner PID: {0})".format(self.proc.pid))
+    increment(self._metrics[NUM_QUERIES_FINISHED])
+    if report.not_enough_memory:
+      increment(self._metrics[NUM_QUERIES_EXCEEDED_MEM_LIMIT])
+    if report.ac_rejected:
+      increment(self._metrics[NUM_QUERIES_AC_REJECTED])
+    if report.ac_timedout:
+      increment(self._metrics[NUM_QUERIES_AC_TIMEDOUT])
+    if report.was_cancelled:
+      increment(self._metrics[NUM_QUERIES_CANCELLED])
+    if report.timed_out:
+      increment(self._metrics[NUM_QUERIES_TIMEDOUT])
+
   def _cancel(self, cursor, report):
     report.timed_out = True