You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/10/08 17:11:59 UTC

[3/6] impala git commit: IMPALA-7643: report # queries actually executing in stress test

IMPALA-7643: report # queries actually executing in stress test

With admission control it's interesting to separate out two categories
of queries:
1. Queries that have started up and are executing
2. Queries that have not made it that far yet, e.g. are waiting to
  establish a client connection (hitting --fe_service_threads limit),
  are in the planner, are queued in admission control or are starting
  up.

We now call 1+2 "Active" and 1 "Executing".

Example output:
Done | Active | Executing | Mem Lmt Ex | AC Reject | AC Timeout | Time Out | Cancel | Err | Incorrect | Next Qry Mem Lmt | Tot Qry Mem Lmt | Tracked Mem | RSS Mem
   0 |      0 |         0 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |                0 |               0 |             |
   0 |     10 |         3 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |              510 |            3922 |         158 |    4541
   0 |     20 |        10 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |              390 |            8534 |         570 |    4517

Refactored QueryRunner.run_query() to reduce nesting and make it more
readable.

Testing:
Ran local stress tests with and without --test_admission_control set.

Change-Id: I5692e8e5ba3224becefc24437197bf5a5b450335
Reviewed-on: http://gerrit.cloudera.org:8080/11587
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/81c58d5d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/81c58d5d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/81c58d5d

Branch: refs/heads/master
Commit: 81c58d5de0d0295b5535ff15afb284bccb6b0026
Parents: d3db326
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Oct 2 17:26:51 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Oct 6 03:13:42 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 175 ++++++++++++++++++++++-----------
 1 file changed, 117 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/81c58d5d/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 80a2386..844c245 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -427,9 +427,9 @@ class StressRunner(object):
     self._mem_mb_needed_for_next_query = Value("i", 0)
 
     # This lock provides a way to stop new queries from running. This lock must be
-    # acquired before writing to _num_queries_started. Before query submission
-    # _num_queries_started must be incremented. Reading _num_queries_started is allowed
-    # without taking this lock.
+    # acquired before writing to _num_queries_submitted. Before query submission
+    # _num_queries_submitted must be incremented. Reading _num_queries_submitted is
+    # allowed without taking this lock.
     self._submit_query_lock = Lock()
 
     self.leak_check_interval_mins = None
@@ -439,7 +439,11 @@ class StressRunner(object):
 
     # All values below are cumulative.
     self._num_queries_dequeued = Value("i", 0)
-    self._num_queries_started = Value("i", 0)
+    # The number of queries that were submitted to a query runner.
+    self._num_queries_submitted = Value("i", 0)
+    # The number of queries that have entered the RUNNING state (i.e. got through Impala's
+    # admission control and started executing) or were cancelled or hit an error.
+    self._num_queries_started_running_or_cancelled = Value("i", 0)
     self._num_queries_finished = Value("i", 0)
     self._num_queries_exceeded_mem_limit = Value("i", 0)
     self._num_queries_ac_rejected = Value("i", 0)
@@ -458,9 +462,9 @@ class StressRunner(object):
     self.results_dir = gettempdir()
 
     self._status_headers = [
-        "Done", "Running", "Mem Lmt Ex", "AC Reject", "AC Timeout", "Time Out", "Cancel",
-        "Err", "Incorrect", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem",
-        "RSS Mem"]
+        "Done", "Active", "Executing", "Mem Lmt Ex", "AC Reject", "AC Timeout",
+        "Time Out", "Cancel", "Err", "Incorrect", "Next Qry Mem Lmt",
+        "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
 
     self._num_queries_to_run = None
     self._query_producer_thread = None
@@ -563,11 +567,11 @@ class StressRunner(object):
   def _start_consuming_queries(self, impala):
     def start_additional_runners_if_needed():
       try:
-        while self._num_queries_started.value < self._num_queries_to_run:
+        while self._num_queries_submitted.value < self._num_queries_to_run:
           sleep(1.0 / self.startup_queries_per_sec)
           # Remember num dequeued/started are cumulative.
           with self._submit_query_lock:
-            if self._num_queries_dequeued.value != self._num_queries_started.value:
+            if self._num_queries_dequeued.value != self._num_queries_submitted.value:
               # Assume dequeued queries are stuck waiting for cluster resources so there
               # is no point in starting an additional runner.
               continue
@@ -597,10 +601,10 @@ class StressRunner(object):
       # while no queries were running.
       ready_to_unlock = None
       try:
-        while self._num_queries_started.value < self._num_queries_to_run:
+        while self._num_queries_submitted.value < self._num_queries_to_run:
           if ready_to_unlock:
             assert query_sumbission_is_locked, "Query submission not yet locked"
-            assert not self._num_queries_running, "Queries are still running"
+            assert not self._num_queries_active, "Queries are still running"
             LOG.debug("Resuming query submission")
             self._next_leak_check_unix_time.value = int(
                 time() + 60 * self.leak_check_interval_mins)
@@ -613,7 +617,7 @@ class StressRunner(object):
               self.leak_check_interval_mins and
               time() > self._next_leak_check_unix_time.value
           ):
-            assert self._num_queries_running <= len(self._query_runners), \
+            assert self._num_queries_active <= len(self._query_runners), \
                 "Each running query should belong to a runner"
             LOG.debug("Stopping query submission")
             self._submit_query_lock.acquire()
@@ -638,7 +642,7 @@ class StressRunner(object):
             max_actual = -1
           self._set_mem_usage_values(max_reported, max_actual)
 
-          if query_sumbission_is_locked and not self._num_queries_running:
+          if query_sumbission_is_locked and not self._num_queries_active:
             if ready_to_unlock is None:
               ready_to_unlock = False
             else:
@@ -670,17 +674,33 @@ class StressRunner(object):
         self._max_mem_mb_usage.value = actual
 
   @property
-  def _num_queries_running(self):
-    num_running = self._num_queries_started.value - self._num_queries_finished.value
+  def _num_queries_active(self):
+    """The number of queries that are currently active (i.e. submitted to a query runner
+    and haven't yet completed)."""
+    num_running = self._num_queries_submitted.value - self._num_queries_finished.value
     assert num_running >= 0, "The number of running queries is negative"
     return num_running
 
+  @property
+  def _num_queries_executing(self):
+    """The number of queries that are currently executing (i.e. entered the RUNNING state
+    and haven't yet completed)."""
+    num_executing = (self._num_queries_started_running_or_cancelled.value -
+        self._num_queries_finished.value)
+    assert num_executing >= 0, "The number of executing queries is negative"
+    return num_executing
+
+  def increment_num_queries_started_running_or_cancelled(self):
+    """Called by query runner to increment _num_queries_started_running_or_cancelled."""
+    increment(self._num_queries_started_running_or_cancelled)
+
+
   def _start_single_runner(self, impalad):
     """Consumer function to take a query of the queue and run it. This is intended to
     run in a separate process so validating the result set can use a full CPU.
     """
     LOG.debug("New query runner started")
-    runner = QueryRunner()
+    runner = QueryRunner(self)
     runner.impalad = impalad
     runner.results_dir = self.results_dir
     runner.use_kerberos = self.use_kerberos
@@ -714,7 +734,7 @@ class StressRunner(object):
         solo_runtime = query.solo_runtime_secs_with_spilling
 
       LOG.debug("Waiting for other query runners to start their queries")
-      while query_idx > self._num_queries_started.value:
+      while query_idx > self._num_queries_submitted.value:
         sleep(0.1)
 
       self._mem_mb_needed_for_next_query.value = mem_limit
@@ -723,13 +743,13 @@ class StressRunner(object):
       with self._mem_broker.reserve_mem_mb(mem_limit) as reservation_id:
         LOG.debug("Received memory reservation")
         with self._submit_query_lock:
-          increment(self._num_queries_started)
+          increment(self._num_queries_submitted)
         should_cancel = self.cancel_probability > random()
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
           timeout = solo_runtime * max(
-              10, self._num_queries_started.value - self._num_queries_finished.value)
+              10, self._num_queries_submitted.value - self._num_queries_finished.value)
         report = runner.run_query(query, timeout, mem_limit, should_cancel=should_cancel)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
@@ -811,8 +831,10 @@ class StressRunner(object):
     print(status_format % (
         # Done
         self._num_queries_finished.value,
-        # Running
-        self._num_queries_started.value - self._num_queries_finished.value,
+        # Active
+        self._num_queries_active,
+        # Executing
+        self._num_queries_executing,
         # Mem Lmt Ex
         self._num_queries_exceeded_mem_limit.value,
         # AC Rejected
@@ -1010,7 +1032,11 @@ class QueryRunner(object):
   SPILLED_PATTERNS = [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")]
   BATCH_SIZE = 1024
 
-  def __init__(self):
+  def __init__(self, stress_runner=None):
+    """Creates a new instance. The caller must fill in the below fields. stress_runner
+    must be provided if this is running in the context of a stress run, so that statistics
+    can be updated."""
+    self.stress_runner = stress_runner
     self.impalad = None
     self.impalad_conn = None
     self.use_kerberos = False
@@ -1043,29 +1069,7 @@ class QueryRunner(object):
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
-        if query.db_name:
-          LOG.debug("Using %s database", query.db_name)
-          cursor.execute("USE %s" % query.db_name)
-        if run_set_up and query.set_up_sql:
-          LOG.debug("Running set up query:\n%s", self.set_up_sql)
-          cursor.execute(query.set_up_sql)
-        for query_option, value in self.common_query_options.iteritems():
-          cursor.execute(
-              "SET {query_option}={value}".format(query_option=query_option, value=value))
-        for query_option, value in query.options.iteritems():
-          cursor.execute(
-              "SET {query_option}={value}".format(query_option=query_option, value=value))
-        cursor.execute("SET ABORT_ON_ERROR=1")
-        if self.test_admission_control:
-          LOG.debug(
-              "Running query without mem limit at %s with timeout secs %s:\n%s",
-              self.impalad.host_name, timeout_secs, query.sql)
-        else:
-          LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
-          cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
-          LOG.debug(
-              "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
-              mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+        self._set_db_and_options(cursor, query, run_set_up, mem_limit_mb, timeout_secs)
         error = None
         try:
           cursor.execute_async(
@@ -1074,19 +1078,10 @@ class QueryRunner(object):
           report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
                                                   cursor._last_operation else None)
           LOG.debug("Query id is %s", report.query_id)
-          sleep_secs = 0.1
-          secs_since_log = 0
-          while cursor.is_executing():
-            if time() > timeout_unix_time:
-              if not should_cancel:
-                fetch_and_set_profile(cursor, report)
-              self._cancel(cursor, report)
-              return report
-            if secs_since_log > 5:
-              secs_since_log = 0
-              LOG.debug("Waiting for query to execute")
-            sleep(sleep_secs)
-            secs_since_log += sleep_secs
+          if not self._wait_until_fetchable(cursor, report, timeout_unix_time,
+                                            should_cancel):
+            return report
+
           if query.query_type == QueryType.SELECT:
             try:
               report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
@@ -1118,6 +1113,70 @@ class QueryRunner(object):
       report.other_error = error
     return report
 
+  def _set_db_and_options(self, cursor, query, run_set_up, mem_limit_mb, timeout_secs):
+    """Set up a new cursor for running a query by switching to the correct database and
+    setting query options."""
+    if query.db_name:
+      LOG.debug("Using %s database", query.db_name)
+      cursor.execute("USE %s" % query.db_name)
+    if run_set_up and query.set_up_sql:
+      LOG.debug("Running set up query:\n%s", query.set_up_sql)
+      cursor.execute(query.set_up_sql)
+    for query_option, value in self.common_query_options.iteritems():
+      cursor.execute(
+          "SET {query_option}={value}".format(query_option=query_option, value=value))
+    for query_option, value in query.options.iteritems():
+      cursor.execute(
+          "SET {query_option}={value}".format(query_option=query_option, value=value))
+    cursor.execute("SET ABORT_ON_ERROR=1")
+    if self.test_admission_control:
+      LOG.debug(
+          "Running query without mem limit at %s with timeout secs %s:\n%s",
+          self.impalad.host_name, timeout_secs, query.sql)
+    else:
+      LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
+      cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
+      LOG.debug(
+          "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+          mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+
+  def _wait_until_fetchable(self, cursor, report, timeout_unix_time, should_cancel):
+    """Wait up until timeout_unix_time until the query results can be fetched (if it's
+    a SELECT query) or until it has finished executing (if it's a different query type
+    like DML). If the timeout expires we either cancel the query or report the timeout.
+    Return True in the first case or False in the second (timeout) case."""
+    # Loop until the query gets to the right state or a timeout expires.
+    sleep_secs = 0.1
+    secs_since_log = 0
+    # True if we incremented num_queries_started_running_or_cancelled for this query.
+    started_running_or_cancelled = False
+    while True:
+      query_state = cursor.status()
+      # Check if the query got past the PENDING/INITIALIZED states, either because
+      # it's executing or hit an error.
+      if (not started_running_or_cancelled and query_state not in ('PENDING_STATE',
+                                                      'INITIALIZED_STATE')):
+        started_running_or_cancelled = True
+        if self.stress_runner:
+          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+      # Return if we're ready to fetch results (in the FINISHED state) or we are in
+      # another terminal state like EXCEPTION.
+      if query_state not in ('PENDING_STATE', 'INITIALIZED_STATE', 'RUNNING_STATE'):
+        return True
+
+      if time() > timeout_unix_time:
+        if not should_cancel:
+          fetch_and_set_profile(cursor, report)
+        self._cancel(cursor, report)
+        if not started_running_or_cancelled and self.stress_runner:
+          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+        return False
+      if secs_since_log > 5:
+        secs_since_log = 0
+        LOG.debug("Waiting for query to execute")
+      sleep(sleep_secs)
+      secs_since_log += sleep_secs
+
   def _cancel(self, cursor, report):
     report.timed_out = True