You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/14 21:46:56 UTC

[1/2] impala git commit: IMPALA-7440: remove --nlj-filter from stress test

Repository: impala
Updated Branches:
  refs/heads/master c0ff4fe8f -> 2a2c3daaa


IMPALA-7440: remove --nlj-filter from stress test

This hides the option and makes it a no-op to avoid breaking
any driver scripts that pass in the option.

Testing:
Started local stress test with and without setting the option, confirmed
that it started running queries.

Change-Id: I51dafdfc9dfc9d732cc4fe42aef5719a0593ef1f
Reviewed-on: http://gerrit.cloudera.org:8080/11212
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/7b706750
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/7b706750
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/7b706750

Branch: refs/heads/master
Commit: 7b706750bcb4e3b5009f82b79c135a399173f5e6
Parents: c0ff4fe
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Aug 14 09:56:54 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Aug 14 21:44:56 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 30 ++----------------------------
 1 file changed, 2 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/7b706750/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 7754e74..55c3a42 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -62,7 +62,7 @@ import sys
 import threading
 import traceback
 from Queue import Empty   # Must be before Queue below
-from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace, SUPPRESS
 from collections import defaultdict
 from contextlib import contextmanager
 from datetime import datetime
@@ -2051,10 +2051,7 @@ def main():
   parser.add_argument(
       "--cancel-probability", type=float, default=0.1,
       help="The probability a query will be cancelled.")
-  parser.add_argument(
-      "--nlj-filter", choices=("in", "out", None),
-      help="'in' means only nested-loop queries will be used, 'out' means no NLJ queries"
-      " will be used. The default is to not filter either way.")
+  parser.add_argument("--nlj-filter", help=SUPPRESS) # Made a no-op by IMPALA-7440.
   parser.add_argument(
       "--common-query-options", default=None, nargs="*",
       help="Space-delimited string of query options and values. This is a freeform "
@@ -2212,29 +2209,6 @@ def main():
       LOG.debug("Filtering query that exceeds --filter-query-mem-ratio: " + query.sql)
       del queries[idx]
 
-  # Remove queries that have a nested loop join in the plan.
-  if args.nlj_filter:
-    with impala.cursor(db_name=args.random_db) as cursor:
-      for idx in xrange(len(queries) - 1, -1, -1):
-        query = queries[idx]
-        if query.db_name:
-          cursor.execute("USE %s" % query.db_name)
-        cursor.execute("EXPLAIN " + query.sql)
-        for row in cursor.fetchall():
-          found_nlj = False
-          for col in row:
-            col = str(col).lower()
-            if "nested loop join" in col:
-              found_nlj = True
-              if args.nlj_filter == "out":
-                del queries[idx]
-              break
-          if found_nlj:
-            break
-        else:
-          if args.nlj_filter == "in":
-            del queries[idx]
-
   if len(queries) == 0:
     raise Exception("All queries were filtered")
   print("Using %s queries" % len(queries))


[2/2] impala git commit: IMPALA-7356 (part 1 of ?): admission control stress

Posted by ta...@apache.org.
IMPALA-7356 (part 1 of ?): admission control stress

Add initial support for running the stress test in a mode where
it tests that memory-based admission control prevents out-of-memory.
A new mode is added that can be enabled by passing
--test-admission-control=true to concurrent_select.py

In this patch, the stress test's builtin "admission control" is still
always enabled, because it is useful in achieving a desired level of
memory overcommit. However, mem_limit is not set by the test - it
should be set by memory-based admission control - and the pass
conditions of the test are tightened up so that OOM is not allowed.
Admission control rejection/timeout are separated out from other errors
because they are expected when a cluster is under stress.

Future patches will likely disable or modify the behaviour of the stress
test's admission control when --test-admission-control=true.

Two columns are added to the output. Here's a sample when running in the
old mode against a minicluster with admission control enabled:

  Done | Running | Mem Lmt Ex | AC Reject | AC Timeout | Time Out | Cancel | Err | Incorrect | Next Qry Mem Lmt | Tot Qry Mem Lmt | Tracked Mem | RSS Mem
     0 |       0 |          0 |         0 |          0 |        0 |      0 |   0 |         0 |                0 |               0 |             |
     1 |      10 |          0 |         1 |          0 |        0 |      0 |   0 |         0 |              137 |            3954 |         147 |     452
    11 |      20 |          0 |        10 |          0 |        0 |      0 |   0 |         0 |             1521 |           12942 |         203 |     446
    15 |      30 |          0 |        14 |          0 |        0 |      0 |   0 |         0 |              130 |           18139 |        4382 |    1911
    46 |      35 |         19 |        26 |          0 |        0 |      0 |   0 |         0 |              328 |           18101 |        3266 |    2916
    98 |      36 |         47 |        50 |          0 |        0 |      0 |   0 |         0 |              270 |           18177 |        3164 |    3087

Also fixes some minor things:
* don't fail at startup if result hash and profile directories already
  exist.
* remove old workarounds for incorrect error messages

Testing:
Ran local stress tests with --test-admission-control set to true and false.

Change-Id: Id31a77f1fe6854a56ce54d1de333793e18087be4
Reviewed-on: http://gerrit.cloudera.org:8080/11205
Reviewed-by: Michael Brown <mi...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2a2c3daa
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2a2c3daa
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2a2c3daa

Branch: refs/heads/master
Commit: 2a2c3daaa9fb64b148114cb3fd9bfdc6bb4b37eb
Parents: 7b70675
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 6 17:33:01 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Aug 14 21:46:26 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 133 +++++++++++++++++++++------------
 1 file changed, 85 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2a2c3daa/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 55c3a42..64e06e1 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -17,8 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# This module is used to stress test Impala by running queries concurrently. Only SELECT
-# queries are used.
+# This module is used to stress test Impala by running queries concurrently.
 #
 # Stress test outline (and notes):
 #  1) Get a set of queries as requested by the user from the CLI options.
@@ -35,19 +34,24 @@
 #     not required.
 #  4) Optionally, set an amount of memory that can be overcommitted. Overcommitting
 #     memory can increase memory pressure which can result in memory being spilled to
-#     disk.
+#     disk or queries failing with out-of-memory.
 #  5) Start submitting queries. There are two modes for throttling the number of
-#     concurrent queries:
-#      a) Submit queries until all available memory (as determined by items 3 and 4) is
-#         used. Before running the query a query mem limit is set between 2a and 2b.
-#         (There is a runtime option to increase the likelihood that a query will be
-#         given the full 2a limit to avoid spilling.)
-#      b) TODO: Use admission control.
+#     concurrent queries, depending on --test-admission-control.
+#      a) test-admission-control=false: Submit queries until all available memory (as
+#         determined by items 3 and 4) is used. Before running the query a query mem
+#         limit is set between 2a and 2b. (There is a runtime option to increase the
+#         likelihood that a query will be given the full 2a limit to avoid spilling.)
+#      b) test-admission-control=true: Submit enough queries to achieve the desired
+#         level of overcommit, but expect that Impala's admission control will throttle
+#         queries. In this mode mem_limit is not set per query.
 #  6) Randomly cancel queries to test cancellation. There is a runtime option to control
 #     the likelihood that a query will be randomly canceled.
-#  7) If a query errored, verify that memory was overcommitted during execution and the
-#     error is a mem limit exceeded error. There is no other reason a query should error
-#     and any such error will cause the stress test to stop.
+#  7) If a query errored, verify that the error is expected. Errors are expected in the
+#     following cases:
+#      a) Memory-based admission control is not being tested (i.e.
+#        --test-admission-control=false), the error is an out-of-memory error and memory
+#        on the cluster is overcommitted.
+#      b) The error is an admission control rejection or timeout.
 #  8) Verify the result set hash of successful queries if there are no DML queries in the
 #     current run.
 
@@ -250,12 +254,20 @@ class QueryReport(object):
     self.runtime_secs = None
     self.mem_was_spilled = False
     self.mem_limit_exceeded = False
-    self.non_mem_limit_error = None
+    self.ac_rejected = False
+    self.ac_timedout = False
+    self.other_error = None
     self.timed_out = False
     self.was_cancelled = False
     self.profile = None
     self.query_id = None
 
+  def has_query_error(self):
+    """Return true if any kind of error status was returned from the query (i.e.
+    the query didn't run to completion, time out or get cancelled)."""
+    return (self.mem_limit_exceeded or self.ac_rejected or self.ac_timedout
+            or self.other_error)
+
   def write_query_profile(self, directory, prefix=None):
     """
     Write out the query profile bound to this object to a given directory.
@@ -392,6 +404,7 @@ class StressRunner(object):
   def __init__(self):
     self.use_kerberos = False
     self.common_query_options = {}
+    self.test_admission_control = False
     self._mem_broker = None
     self._verify_results = True
     self._select_probability = None
@@ -418,6 +431,8 @@ class StressRunner(object):
     self._num_queries_started = Value("i", 0)
     self._num_queries_finished = Value("i", 0)
     self._num_queries_exceeded_mem_limit = Value("i", 0)
+    self._num_queries_ac_rejected = Value("i", 0)
+    self._num_queries_ac_timedout = Value("i", 0)
     self._num_queries_cancelled = Value("i", 0)
     self._num_queries_timedout = Value("i", 0)
     self._num_result_mismatches = Value("i", 0)
@@ -432,7 +447,7 @@ class StressRunner(object):
     self.results_dir = gettempdir()
 
     self._status_headers = [
-        "Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
+        "Done", "Running", "Mem Lmt Ex", "AC Reject", "AC Timeout", "Time Out", "Cancel",
         "Err", "Incorrect", "Next Qry Mem Lmt", "Tot Qry Mem Lmt", "Tracked Mem",
         "RSS Mem"]
 
@@ -656,6 +671,7 @@ class StressRunner(object):
     runner.results_dir = self.results_dir
     runner.use_kerberos = self.use_kerberos
     runner.common_query_options = self.common_query_options
+    runner.test_admission_control = self.test_admission_control
     runner.connect()
 
     while not self._query_queue.empty():
@@ -705,8 +721,8 @@ class StressRunner(object):
         if report.timed_out and should_cancel:
           report.was_cancelled = True
         self._update_from_query_report(report)
-        if report.non_mem_limit_error:
-          error_msg = str(report.non_mem_limit_error)
+        if report.other_error:
+          error_msg = str(report.other_error)
           # There is a possible race during cancellation. If a fetch request fails (for
           # example due to hitting a mem limit), just before the cancellation request, the
           # server may have already unregistered the query as part of the fetch failure.
@@ -740,8 +756,8 @@ class StressRunner(object):
               id=report.query_id,
               mesg=error_msg))
         if (
-            report.mem_limit_exceeded and
-            not self._mem_broker.was_overcommitted(reservation_id)
+            report.mem_limit_exceeded and (self.test_admission_control or
+            not self._mem_broker.was_overcommitted(reservation_id))
         ):
           increment(self._num_successive_errors)
           self._write_query_profile(
@@ -749,8 +765,7 @@ class StressRunner(object):
           raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. "
                           "Query ID: {0}".format(report.query_id))
         if (
-            not report.mem_limit_exceeded and
-            not report.timed_out and
+            not report.timed_out and not report.has_query_error() and
             (self._verify_results and report.result_hash != query.result_hash)
         ):
           increment(self._num_successive_errors)
@@ -786,6 +801,10 @@ class StressRunner(object):
         self._num_queries_started.value - self._num_queries_finished.value,
         # Mem Lmt Ex
         self._num_queries_exceeded_mem_limit.value,
+        # AC Rejected
+        self._num_queries_ac_rejected.value,
+        # AC Timed Out
+        self._num_queries_ac_timedout.value,
         # Time Out
         self._num_queries_timedout.value - self._num_queries_cancelled.value,
         # Cancel
@@ -808,6 +827,10 @@ class StressRunner(object):
     increment(self._num_queries_finished)
     if report.mem_limit_exceeded:
       increment(self._num_queries_exceeded_mem_limit)
+    if report.ac_rejected:
+      increment(self._num_queries_ac_rejected)
+    if report.ac_timedout:
+      increment(self._num_queries_ac_timedout)
     if report.was_cancelled:
       increment(self._num_queries_cancelled)
     if report.timed_out:
@@ -1019,11 +1042,16 @@ class QueryRunner(object):
           cursor.execute(
               "SET {query_option}={value}".format(query_option=query_option, value=value))
         cursor.execute("SET ABORT_ON_ERROR=1")
-        LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
-        cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
-        LOG.debug(
-            "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
-            mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
+        if self.test_admission_control:
+          LOG.debug(
+              "Running query without mem limit at %s with timeout secs %s:\n%s",
+              self.impalad.host_name, timeout_secs, query.sql)
+        else:
+          LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
+          cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
+          LOG.debug(
+              "Running query with %s MB mem limit at %s with timeout secs %s:\n%s",
+              mem_limit_mb, self.impalad.host_name, timeout_secs, query.sql)
         error = None
         try:
           cursor.execute_async(
@@ -1061,8 +1089,8 @@ class QueryRunner(object):
           report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
                                                   cursor._last_operation else None)
           LOG.debug("Error running query with id %s: %s", report.query_id, error)
-          self._check_for_mem_limit_exceeded(report, cursor, error)
-        if report.non_mem_limit_error or report.mem_limit_exceeded:
+          self._check_for_memory_errors(report, cursor, error)
+        if report.has_query_error():
           return report
         report.runtime_secs = time() - start_time
         if cursor.execution_failed() or self.check_if_mem_was_spilled:
@@ -1073,7 +1101,7 @@ class QueryRunner(object):
           report.mem_limit_exceeded = "Memory limit exceeded" in report.profile
     except Exception as error:
       # A mem limit error would have been caught above, no need to check for that here.
-      report.non_mem_limit_error = error
+      report.other_error = error
     return report
 
   def _cancel(self, cursor, report):
@@ -1095,13 +1123,19 @@ class QueryRunner(object):
         LOG.debug("Error cancelling query %s through the web server: %s",
                   report.query_id, e)
 
-  def _check_for_mem_limit_exceeded(self, report, cursor, caught_exception):
+  def _check_for_memory_errors(self, report, cursor, caught_exception):
     """To be called after a query failure to check for signs of failed due to a
-    mem limit. The report will be updated accordingly.
+    mem limit or admission control rejection/timeout. The report will be updated
+    accordingly.
     """
     fetch_and_set_profile(cursor, report)
     caught_msg = str(caught_exception).lower().strip()
-
+    if "rejected query from pool" in caught_msg:
+      report.ac_rejected = True
+      return
+    if "admission for query exceeded timeout" in caught_msg:
+      report.ac_timedout = True
+      return
     if "memory limit exceeded" in caught_msg or \
        "repartitioning did not reduce the size of a spilled partition" in caught_msg or \
        "failed to get minimum memory reservation" in caught_msg or \
@@ -1110,20 +1144,9 @@ class QueryRunner(object):
       report.mem_limit_exceeded = True
       return
 
-    # If the mem limit is very low and abort_on_error is enabled, the message from
-    # exceeding the mem_limit could be something like:
-    #   Metadata states that in group hdfs://<node>:8020<path> there are <X> rows,
-    #   but only <Y> rows were read.
-    if (
-        "metadata states that in group" in caught_msg and
-        "rows were read" in caught_msg
-    ):
-      report.mem_limit_exceeded = True
-      return
-
     LOG.debug("Non-mem limit error for query with id %s: %s", report.query_id,
               caught_exception, exc_info=True)
-    report.non_mem_limit_error = caught_exception
+    report.other_error = caught_exception
 
   def _hash_result(self, cursor, timeout_unix_time, query):
     """Returns a hash that is independent of row order. 'query' is only used for debug
@@ -1312,6 +1335,7 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
   runner.common_query_options = converted_args.common_query_options
+  runner.test_admission_control = converted_args.test_admission_control
   runner.impalad = impala.impalads[0]
   runner.results_dir = results_dir
   runner.use_kerberos = converted_args.use_kerberos
@@ -1360,14 +1384,14 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
             os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
         raise QueryTimeout(
             "query {0} timed out during binary search".format(query.logical_query_id))
-      if report.non_mem_limit_error:
+      if report.other_error:
         report.write_query_profile(
             os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
         raise Exception(
             "query {0} errored during binary search: {1}".format(
-                query.logical_query_id, str(report.non_mem_limit_error)))
+                query.logical_query_id, str(report.other_error)))
       LOG.debug("Spilled: %s" % report.mem_was_spilled)
-      if not report.mem_limit_exceeded:
+      if not report.has_query_error():
         if query.result_hash is None:
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
@@ -2058,6 +2082,14 @@ def main():
       "string with little regard to whether you've spelled the query options correctly "
       "or set valid values. Example: --common-query-options "
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
+  parser.add_argument(
+      "--test-admission-control", type=bool, default=False,
+      help="If true, assume that the Impala cluster under test is using memory-based "
+      "admission control and should not admit queries that cannot be run to completion. "
+      "In this mode the stress runner does not set mem_limit on queries and "
+      "out-of-memory errors are not expected in this mode so will fail the stress test "
+      "if encountered. The stress runner still tracks the 'admitted' memory so that "
+      "it can try to submit more queries than there is available memory for.")
   args = parser.parse_args()
   converted_args = StressArgConverter(args)
 
@@ -2075,8 +2107,12 @@ def main():
         "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db,"
         "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required")
 
-  os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
-  os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
+  result_hashes_path = os.path.join(args.results_dir, RESULT_HASHES_DIR)
+  if not os.path.isdir(result_hashes_path):
+    os.makedirs(result_hashes_path)
+  results_dir_path = os.path.join(args.results_dir, PROFILES_DIR)
+  if not os.path.isdir(results_dir_path):
+    os.makedirs(results_dir_path)
 
   cluster = cli_options.create_cluster(args)
   impala = cluster.impala
@@ -2231,6 +2267,7 @@ def main():
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
   stress_runner.common_query_options = converted_args.common_query_options
+  stress_runner.test_admission_control = converted_args.test_admission_control
   stress_runner.run_queries(
       queries, impala, args.max_queries, args.mem_overcommit_pct,
       should_print_status=not args.no_status,