You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/21 05:00:46 UTC

[impala] branch master updated (257fa0c -> 57ce2bd)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 257fa0c  IMPALA-8209: Include fragment instance ID in memz/ breakdown
     new 9541452  IMPALA-6662: Make stress test resilient to hangs due to client crashes
     new 540278d  IMPALA-7450. Set thread name during refresh/load operations
     new 57ce2bd  IMPALA-8222: disable per-query timeouts in stress test

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impala/catalog/CatalogServiceCatalog.java      |  76 +++--
 .../java/org/apache/impala/catalog/HdfsTable.java  |  25 +-
 .../org/apache/impala/catalog/TableLoader.java     |  14 +-
 .../apache/impala/util/ThreadNameAnnotator.java    |  62 ++++
 .../impala/util/ThreadNameAnnotatorTest.java       | 158 +++++++++
 tests/stress/concurrent_select.py                  | 352 +++++++++++++--------
 6 files changed, 515 insertions(+), 172 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/util/ThreadNameAnnotator.java
 create mode 100644 fe/src/test/java/org/apache/impala/util/ThreadNameAnnotatorTest.java


[impala] 01/03: IMPALA-6662: Make stress test resilient to hangs due to client crashes

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 95414528199011716af0c55ac9c11eb69fb442b7
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Feb 19 00:33:23 2019 -0800

    IMPALA-6662: Make stress test resilient to hangs due to client crashes
    
    Thanks to Sailesh Mukil for the initial version of this patch.
    
    The concurrent_select.py process starts multiple sub processes
    (called query runners), to run the queries. It also starts 2 threads
    called the query producer thread and the query consumer thread. The
    query producer thread adds queries to a query queue and the query
    consumer thread pulls off the queue and feeds the queries to the
    query runners.
    
    The query runner, once it gets queries, does the following:
    ...
      with _submit_query_lock:
        increment(num_queries_started)
      run_query()    # One runner crashes here.
      increment(num_queries_finished)
    ...
    
    One of the runners crash inside run_query(), thereby never incrementing
    num_queries_finished.
    
    Another thread that's supposed to check for memory leaks
    (but actually doesn't), periodically acquires '_submit_query_lock' and
    waits for the number of running queries to reach 0 before releasing the
    lock.
    
    However, in the above case, the number of running queries will never
    reach 0 because one of the query runners hasn't incremented
    'num_queries_finished' and exited. Therefore, the poll_mem_usage()
    function will hold the lock indefinitely, causing no new queries to be
    submitted, nor the stress test to complete running.
    
    This patch fixes the problem by changing the global trackers of
    num_queries_started and num_queries_finished, etc. to a per
    QueryRunner basis. Anytime we want to find the total number of queries
    started/finished/cancelled, etc., we aggregate the values from all the
    runners. We synchronize access by adding a new lock called the
    _query_runners_lock.
    
    In _wait_for_test_to_finish(), we periodically check if a QueryRunner has
    died, and if it has, we make sure to update the num_queries_finished to
    num_queries_started, since it may have died before updating the 'finished'
    value, and we also count the error.
    
    Other changes:
    * Boilerplate code is reduced by storing all metrics in a dictionary
      keyed by the metric name, instead of stamping out the code for
      10+ variables.
    * Added more comments and debug strings
    * Reformatted some code.
    
    Testing:
    Ran the stress test with the new patch locally and against a cluster.
    
    Change-Id: I525bf13e0f3dd660c0d9f5c2bf6eb292e7ebb8af
    Reviewed-on: http://gerrit.cloudera.org:8080/12521
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/stress/concurrent_select.py | 337 +++++++++++++++++++++++++-------------
 1 file changed, 222 insertions(+), 115 deletions(-)

diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index 00004a3..e352d00 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -69,6 +69,7 @@ from Queue import Empty   # Must be before Queue below
 from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace, SUPPRESS
 from collections import defaultdict
 from contextlib import contextmanager
+from copy import copy
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
 from random import choice, random, randrange, shuffle
@@ -98,6 +99,22 @@ RESULT_HASHES_DIR = "result_hashes"
 # The version of the file format containing the collected query runtime info.
 RUNTIME_INFO_FILE_VERSION = 3
 
+# Metrics collected during the stress running process.
+NUM_QUERIES_DEQUEUED = "num_queries_dequeued"
+# The number of queries that were submitted to a query runner.
+NUM_QUERIES_SUBMITTED = "num_queries_submitted"
+# The number of queries that have entered the RUNNING state (i.e. got through Impala's
+# admission control and started executing) or were cancelled or hit an error.
+NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED = "num_queries_started_running_or_cancelled"
+NUM_QUERIES_FINISHED = "num_queries_finished"
+NUM_QUERIES_EXCEEDED_MEM_LIMIT = "num_queries_exceeded_mem_limit"
+NUM_QUERIES_AC_REJECTED = "num_queries_ac_rejected"
+NUM_QUERIES_AC_TIMEDOUT = "num_queries_ac_timedout"
+NUM_QUERIES_CANCELLED = "num_queries_cancelled"
+NUM_QUERIES_TIMEDOUT = "num_queries_timedout"
+NUM_RESULT_MISMATCHES = "num_result_mismatches"
+NUM_OTHER_ERRORS = "num_other_errors"
+
 
 class StressArgConverter(object):
   def __init__(self, args):
@@ -427,8 +444,8 @@ class StressRunner(object):
     self._mem_mb_needed_for_next_query = Value("i", 0)
 
     # This lock provides a way to stop new queries from running. This lock must be
-    # acquired before writing to _num_queries_submitted. Before query submission
-    # _num_queries_submitted must be incremented. Reading _num_queries_submitted is
+    # acquired before writing to the NUM_QUERIES_SUBMITTED metric for the query_runner,
+    # which is incremented before every query submission.Reading NUM_QUERIES_SUBMITTED is
     # allowed without taking this lock.
     self._submit_query_lock = Lock()
 
@@ -437,22 +454,6 @@ class StressRunner(object):
     self._max_mem_mb_reported_usage = Value("i", -1)   # -1 => Unknown
     self._max_mem_mb_usage = Value("i", -1)   # -1 => Unknown
 
-    # All values below are cumulative.
-    self._num_queries_dequeued = Value("i", 0)
-    # The number of queries that were submitted to a query runner.
-    self._num_queries_submitted = Value("i", 0)
-    # The number of queries that have entered the RUNNING state (i.e. got through Impala's
-    # admission control and started executing) or were cancelled or hit an error.
-    self._num_queries_started_running_or_cancelled = Value("i", 0)
-    self._num_queries_finished = Value("i", 0)
-    self._num_queries_exceeded_mem_limit = Value("i", 0)
-    self._num_queries_ac_rejected = Value("i", 0)
-    self._num_queries_ac_timedout = Value("i", 0)
-    self._num_queries_cancelled = Value("i", 0)
-    self._num_queries_timedout = Value("i", 0)
-    self._num_result_mismatches = Value("i", 0)
-    self._num_other_errors = Value("i", 0)
-
     self.cancel_probability = 0
     self.spill_probability = 0
 
@@ -468,10 +469,65 @@ class StressRunner(object):
 
     self._num_queries_to_run = None
     self._query_producer_thread = None
-    self._query_runners = list()
+
+    # This lock is used to synchronize access to the '_query_runners' list and also to all
+    # the '_past_runners*' members.
+    self._query_runners_lock = Lock()
+    self._query_runners = []
+
+    # These are the cumulative values of all the queries that have started/finished/-
+    # dequeued, etc. on runners that have already died. Every time we notice that a query
+    # runner has died, we update these values.
+    self._past_runner_metrics = defaultdict(lambda: 0)
+
     self._query_consumer_thread = None
     self._mem_polling_thread = None
 
+  def _record_runner_metrics_before_evict(self, query_runner):
+    """ Before removing 'query_runner' from the self._query_runners list, record its
+        metrics. Must only be called if 'query_runner' is to be removed from the list.
+        MUST hold '_query_runners_lock' before calling.
+    """
+    for key, synchronized_val in query_runner._metrics.iteritems():
+      self._past_runner_metrics[key] += synchronized_val.value
+
+  def _calc_total_runner_metrics(self):
+    """ Calculate the total of metrics across past and active query runners. """
+    totals = copy(self._past_runner_metrics)
+    with self._query_runners_lock:
+      for query_runner in self._query_runners:
+        for key, synchronized_val in query_runner._metrics.iteritems():
+          totals[key] += synchronized_val.value
+    return totals
+
+  def _calc_total_runner_metric(self, key):
+    """ Calculate the total of metric 'key' across past and active query runners. """
+    with self._query_runners_lock:
+      return self._calc_total_runner_metric_no_lock(key)
+
+  def _calc_total_runner_metric_no_lock(self, key):
+    """ TODO: Get rid of this function after reformatting how we obtain query indices.
+        _query_runners_lock MUST be taken before calling this function.
+    """
+    total = self._past_runner_metrics[key]
+    for runner in self._query_runners:
+      total += runner._metrics[key].value
+    return total
+
+  def _total_num_queries_submitted(self):
+    return self._calc_total_runner_metric(NUM_QUERIES_SUBMITTED)
+
+  def _total_num_queries_active(self):
+    """The number of queries that are currently active (i.e. submitted to a query runner
+    and haven't yet completed)."""
+    metrics = self._calc_total_runner_metrics()
+    num_running = metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED]
+    assert num_running >= 0, "The number of running queries is negative"
+    return num_running
+
+  def _num_runners_remaining(self):
+    return len(self._query_runners)
+
   def run_queries(
       self, queries, impala, num_queries_to_run, mem_overcommit_pct, should_print_status,
       verify_results, select_probability
@@ -561,17 +617,26 @@ class StressRunner(object):
         LOG.error("Error producing queries: %s", e)
         current_thread().error = e
         raise e
+      LOG.info("Producing thread completed job. Exiting...")
     self._query_producer_thread = create_and_start_daemon_thread(
         enqueue_queries, "Query Producer")
 
   def _start_consuming_queries(self, impala):
     def start_additional_runners_if_needed():
       try:
-        while self._num_queries_submitted.value < self._num_queries_to_run:
+        while self._total_num_queries_submitted() < self._num_queries_to_run:
+          # TODO: sleeping for the below amount leads to slower submission than the goal,
+          # because it does not factor in the time spent by this thread outside of the
+          # sleep() call.
           sleep(1.0 / self.startup_queries_per_sec)
           # Remember num dequeued/started are cumulative.
           with self._submit_query_lock:
-            if self._num_queries_dequeued.value != self._num_queries_submitted.value:
+            metrics = self._calc_total_runner_metrics()
+            num_dequeued = metrics[NUM_QUERIES_DEQUEUED]
+            num_submitted = metrics[NUM_QUERIES_SUBMITTED]
+            LOG.debug("Submitted {0} queries. Dequeued {1} queries".format(
+                num_submitted, num_dequeued))
+            if num_dequeued != num_submitted:
               # Assume dequeued queries are stuck waiting for cluster resources so there
               # is no point in starting an additional runner.
               continue
@@ -579,10 +644,21 @@ class StressRunner(object):
           if self.max_coordinators > 0:
             num_coordinators = min(num_coordinators, self.max_coordinators)
           impalad = impala.impalads[len(self._query_runners) % num_coordinators]
-          runner = Process(target=self._start_single_runner, args=(impalad, ))
-          runner.daemon = True
-          self._query_runners.append(runner)
-          runner.start()
+
+          query_runner = QueryRunner()
+          query_runner.impalad = impalad
+          query_runner.results_dir = self.results_dir
+          query_runner.use_kerberos = self.use_kerberos
+          query_runner.common_query_options = self.common_query_options
+          query_runner.test_admission_control = self.test_admission_control
+          query_runner.proc = \
+              Process(target=self._start_single_runner, args=(query_runner, ))
+          query_runner.proc.daemon = True
+          with self._query_runners_lock:
+            self._query_runners.append(query_runner)
+          query_runner.proc.start()
+
+        LOG.info("Consuming thread completed job. Exiting...")
       except Exception as e:
         LOG.error("Error consuming queries: %s", e)
         current_thread().error = e
@@ -601,10 +677,10 @@ class StressRunner(object):
       # while no queries were running.
       ready_to_unlock = None
       try:
-        while self._num_queries_submitted.value < self._num_queries_to_run:
+        while self._total_num_queries_submitted() < self._num_queries_to_run:
           if ready_to_unlock:
             assert query_submission_is_locked, "Query submission not yet locked"
-            assert not self._num_queries_active, "Queries are still running"
+            assert not self._total_num_queries_active(), "Queries are still running"
             LOG.debug("Resuming query submission")
             self._next_leak_check_unix_time.value = int(
                 time() + 60 * self.leak_check_interval_mins)
@@ -617,7 +693,7 @@ class StressRunner(object):
               self.leak_check_interval_mins and
               time() > self._next_leak_check_unix_time.value
           ):
-            assert self._num_queries_active <= len(self._query_runners), \
+            assert self._total_num_queries_active() <= self._num_runners_remaining(), \
                 "Each running query should belong to a runner"
             LOG.debug("Stopping query submission")
             self._submit_query_lock.acquire()
@@ -642,7 +718,7 @@ class StressRunner(object):
             max_actual = -1
           self._set_mem_usage_values(max_reported, max_actual)
 
-          if query_submission_is_locked and not self._num_queries_active:
+          if query_submission_is_locked and not self._total_num_queries_active():
             if ready_to_unlock is None:
               ready_to_unlock = False
             else:
@@ -673,40 +749,15 @@ class StressRunner(object):
         self._max_mem_mb_reported_usage.value = reported
         self._max_mem_mb_usage.value = actual
 
-  @property
-  def _num_queries_active(self):
-    """The number of queries that are currently active (i.e. submitted to a query runner
-    and haven't yet completed)."""
-    num_running = self._num_queries_submitted.value - self._num_queries_finished.value
-    assert num_running >= 0, "The number of running queries is negative"
-    return num_running
-
-  @property
-  def _num_queries_executing(self):
-    """The number of queries that are currently executing (i.e. entered the RUNNING state
-    and haven't yet completed)."""
-    num_executing = (self._num_queries_started_running_or_cancelled.value -
-        self._num_queries_finished.value)
-    assert num_executing >= 0, "The number of executing queries is negative"
-    return num_executing
-
-  def increment_num_queries_started_running_or_cancelled(self):
-    """Called by query runner to increment _num_queries_started_running_or_cancelled."""
-    increment(self._num_queries_started_running_or_cancelled)
-
-
-  def _start_single_runner(self, impalad):
+  def _start_single_runner(self, query_runner):
     """Consumer function to take a query of the queue and run it. This is intended to
     run in a separate process so validating the result set can use a full CPU.
     """
     LOG.debug("New query runner started")
-    runner = QueryRunner(self)
-    runner.impalad = impalad
-    runner.results_dir = self.results_dir
-    runner.use_kerberos = self.use_kerberos
-    runner.common_query_options = self.common_query_options
-    runner.test_admission_control = self.test_admission_control
-    runner.connect()
+
+    # The query runner should already be set up. We just need to connect() before using
+    # the runner.
+    query_runner.connect()
 
     while not self._query_queue.empty():
       try:
@@ -717,9 +768,10 @@ class StressRunner(object):
         LOG.debug("Query running aborting due to closed query queue")
         break
       LOG.debug("Getting query_idx")
-      with self._num_queries_dequeued.get_lock():
-        query_idx = self._num_queries_dequeued.value
-        self._num_queries_dequeued.value += 1
+      with self._query_runners_lock:
+        query_idx = self._calc_total_runner_metric_no_lock(NUM_QUERIES_DEQUEUED)
+        increment(query_runner._metrics[NUM_QUERIES_DEQUEUED])
+        LOG.debug("Query_idx: {0} | PID: {1}".format(query_idx, query_runner.proc.pid))
 
       if not query.required_mem_mb_without_spilling:
         mem_limit = query.required_mem_mb_with_spilling
@@ -734,7 +786,7 @@ class StressRunner(object):
         solo_runtime = query.solo_runtime_secs_with_spilling
 
       LOG.debug("Waiting for other query runners to start their queries")
-      while query_idx > self._num_queries_submitted.value:
+      while query_idx > self._total_num_queries_submitted():
         sleep(0.1)
 
       self._mem_mb_needed_for_next_query.value = mem_limit
@@ -743,18 +795,21 @@ class StressRunner(object):
       with self._mem_broker.reserve_mem_mb(mem_limit) as reservation_id:
         LOG.debug("Received memory reservation")
         with self._submit_query_lock:
-          increment(self._num_queries_submitted)
+          increment(query_runner._metrics[NUM_QUERIES_SUBMITTED])
+
         should_cancel = self.cancel_probability > random()
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
+          metrics = self._calc_total_runner_metrics()
           timeout = solo_runtime * max(
-              10, self._num_queries_submitted.value - self._num_queries_finished.value)
-        report = runner.run_query(query, timeout, mem_limit, should_cancel=should_cancel)
+              10, metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED])
+        report = query_runner.run_query(query, timeout, mem_limit,
+            should_cancel=should_cancel)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
           report.was_cancelled = True
-        self._update_from_query_report(report)
+        query_runner.update_from_query_report(report)
         if report.other_error:
           error_msg = str(report.other_error)
           # There is a possible race during cancellation. If a fetch request fails (for
@@ -783,7 +838,7 @@ class StressRunner(object):
             self._num_successive_errors.value = 0
             continue
           increment(self._num_successive_errors)
-          increment(self._num_other_errors)
+          increment(query_runner._metrics[NUM_OTHER_ERRORS])
           self._write_query_profile(report, PROFILES_DIR, prefix='error')
           raise Exception("Query {query} ID {id} failed: {mesg}".format(
               query=query.logical_query_id,
@@ -803,7 +858,7 @@ class StressRunner(object):
             (self._verify_results and report.result_hash != query.result_hash)
         ):
           increment(self._num_successive_errors)
-          increment(self._num_result_mismatches)
+          increment(query_runner._metrics[NUM_RESULT_MISMATCHES])
           self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results')
           raise Exception(dedent("""\
                                  Result hash mismatch; expected {expected}, got {actual}
@@ -818,6 +873,7 @@ class StressRunner(object):
               "Query {query} unexpectedly timed out. Query ID: {id}".format(
                   query=query.logical_query_id, id=report.query_id))
         self._num_successive_errors.value = 0
+    LOG.debug("Query runner completed...")
 
   def _print_status_header(self):
     print(" | ".join(self._status_headers))
@@ -826,29 +882,31 @@ class StressRunner(object):
     if print_header:
       self._print_status_header()
 
+    metrics = self._calc_total_runner_metrics()
     reported_mem, actual_mem = self._get_mem_usage_values(reset=True)
     status_format = " | ".join(["%%%ss" % len(header) for header in self._status_headers])
     print(status_format % (
         # Done
-        self._num_queries_finished.value,
+        metrics[NUM_QUERIES_FINISHED],
         # Active
-        self._num_queries_active,
+        metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED],
         # Executing
-        self._num_queries_executing,
+        metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED] -
+        metrics[NUM_QUERIES_FINISHED],
         # Mem Lmt Ex
-        self._num_queries_exceeded_mem_limit.value,
+        metrics[NUM_QUERIES_EXCEEDED_MEM_LIMIT],
         # AC Rejected
-        self._num_queries_ac_rejected.value,
+        metrics[NUM_QUERIES_AC_REJECTED],
         # AC Timed Out
-        self._num_queries_ac_timedout.value,
+        metrics[NUM_QUERIES_AC_TIMEDOUT],
         # Time Out
-        self._num_queries_timedout.value - self._num_queries_cancelled.value,
+        metrics[NUM_QUERIES_TIMEDOUT] - metrics[NUM_QUERIES_CANCELLED],
         # Cancel
-        self._num_queries_cancelled.value,
+        metrics[NUM_QUERIES_CANCELLED],
         # Err
-        self._num_other_errors.value,
+        metrics[NUM_OTHER_ERRORS],
         # Incorrect
-        self._num_result_mismatches.value,
+        metrics[NUM_RESULT_MISMATCHES],
         # Next Qry Mem Lmt
         self._mem_mb_needed_for_next_query.value,
         # Total Qry Mem Lmt
@@ -858,19 +916,6 @@ class StressRunner(object):
         # RSS Mem
         "" if actual_mem == -1 else actual_mem))
 
-  def _update_from_query_report(self, report):
-    LOG.debug("Updating runtime stats")
-    increment(self._num_queries_finished)
-    if report.not_enough_memory:
-      increment(self._num_queries_exceeded_mem_limit)
-    if report.ac_rejected:
-      increment(self._num_queries_ac_rejected)
-    if report.ac_timedout:
-      increment(self._num_queries_ac_timedout)
-    if report.was_cancelled:
-      increment(self._num_queries_cancelled)
-    if report.timed_out:
-      increment(self._num_queries_timedout)
 
   def _write_query_profile(self, report, subdir, prefix=None):
     report.write_query_profile(
@@ -886,10 +931,10 @@ class StressRunner(object):
       sys.exit(1)
 
   def _check_for_test_failure(self):
+    metrics = self._calc_total_runner_metrics()
     if (
-        self._num_other_errors.value > 0 or
-        self._num_result_mismatches.value > 0 or
-        self._num_queries_timedout.value - self._num_queries_cancelled.value > 0
+        metrics[NUM_OTHER_ERRORS] > 0 or metrics[NUM_RESULT_MISMATCHES] > 0 or
+        metrics[NUM_QUERIES_TIMEDOUT] - metrics[NUM_QUERIES_CANCELLED] > 0
     ):
       LOG.error("Failing the stress test due to unexpected errors, incorrect results, or "
                 "timed out queries. See the report line above for details.")
@@ -901,10 +946,11 @@ class StressRunner(object):
     lines_printed = 1
     sleep_secs = 0.1
 
+    num_runners_remaining = self._num_runners_remaining()
     while (
         self._query_producer_thread.is_alive() or
         self._query_consumer_thread.is_alive() or
-        self._query_runners
+        num_runners_remaining
     ):
       if self._query_producer_thread.error or self._query_consumer_thread.error:
         # This is bad enough to abort early. A failure here probably means there's a
@@ -912,32 +958,64 @@ class StressRunner(object):
         # not critical so is ignored.
         LOG.error("Aborting due to error in producer/consumer")
         sys.exit(1)
-      checked_for_crashes = False
-      for idx, runner in enumerate(self._query_runners):
-        if runner.exitcode is not None:
-          if runner.exitcode != 0:
-            if not checked_for_crashes:
-              LOG.info("Checking for crashes")
-              if print_crash_info_if_exists(impala, self.start_time):
-                self.print_duration()
-                sys.exit(runner.exitcode)
-              LOG.info("No crashes detected")
-              checked_for_crashes = True
-            self._check_successive_errors()
-          del self._query_runners[idx]
+      do_check_for_impala_crashes = False
+      with self._query_runners_lock:
+        for idx, runner in enumerate(self._query_runners):
+          if runner.proc.exitcode is not None:
+            if runner.proc.exitcode != 0:
+              # Since at least one query runner process failed, make sure to check for
+              # crashed impalads.
+              do_check_for_impala_crashes = True
+              # TODO: Handle case for num_queries_dequeued != num_queries_submitted
+              num_submitted = runner._metrics[NUM_QUERIES_SUBMITTED].value
+              num_finished = runner._metrics[NUM_QUERIES_FINISHED].value
+              if num_submitted != num_finished:
+                # The query runner process may have crashed before updating the number
+                # of finished queries but after it incremented the number of queries
+                # submitted.
+                assert num_submitted - num_finished == 1
+                increment(runner._metrics[NUM_QUERIES_FINISHED])
+                # Since we know that the runner crashed while trying to run a query, we
+                # count it as an 'other error'
+                increment(runner._metrics[NUM_OTHER_ERRORS])
+              self._check_successive_errors()
+            assert runner._metrics[NUM_QUERIES_SUBMITTED].value == \
+                    runner._metrics[NUM_QUERIES_FINISHED].value, \
+                    str([(k, v.value) for k, v in runner._metrics.iteritems()])
+            # Make sure to record all the metrics before removing this runner from the
+            # list.
+            print("Query runner ({0}) exited with exit code {1}".format(
+                runner.proc.pid, runner.proc.exitcode))
+            self._record_runner_metrics_before_evict(self._query_runners[idx])
+
+            # Remove the query runner from the list.
+            del self._query_runners[idx]
+
+      if do_check_for_impala_crashes:
+        # Since we know that at least one query runner failed, check if any of the Impala
+        # daemons themselves crashed.
+        LOG.info("Checking for Impala crashes")
+        if print_crash_info_if_exists(impala, self.start_time):
+          self.print_duration()
+          sys.exit(runner.proc.exitcode)
+        do_check_for_impala_crashes = False
+        LOG.info("No Impala crashes detected")
+
       sleep(sleep_secs)
+      num_runners_remaining = self._num_runners_remaining()
+
       if should_print_status:
         last_report_secs += sleep_secs
         if last_report_secs > 5:
           if (
               not self._query_producer_thread.is_alive() or
               not self._query_consumer_thread.is_alive() or
-              not self._query_runners
+              not num_runners_remaining
           ):
             LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive())
             LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive())
             LOG.debug("Queue size: %s" % self._query_queue.qsize())
-            LOG.debug("Runners: %s" % len(self._query_runners))
+            LOG.debug("Runners: %s" % num_runners_remaining)
           last_report_secs = 0
           lines_printed %= 50
           self._print_status(print_header=(lines_printed == 0))
@@ -1043,6 +1121,22 @@ class QueryRunner(object):
     self.results_dir = gettempdir()
     self.check_if_mem_was_spilled = False
     self.common_query_options = {}
+    self.proc = None
+
+    # All these values are shared values between processes. We want these to be accessible
+    # by the parent process that started this QueryRunner, for operational purposes.
+    self._metrics = {
+        NUM_QUERIES_DEQUEUED: Value("i", 0),
+        NUM_QUERIES_SUBMITTED: Value("i", 0),
+        NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED: Value("i", 0),
+        NUM_QUERIES_FINISHED: Value("i", 0),
+        NUM_QUERIES_EXCEEDED_MEM_LIMIT: Value("i", 0),
+        NUM_QUERIES_AC_REJECTED: Value("i", 0),
+        NUM_QUERIES_AC_TIMEDOUT: Value("i", 0),
+        NUM_QUERIES_CANCELLED: Value("i", 0),
+        NUM_QUERIES_TIMEDOUT: Value("i", 0),
+        NUM_RESULT_MISMATCHES: Value("i", 0),
+        NUM_OTHER_ERRORS: Value("i", 0)}
 
   def connect(self):
     self.impalad_conn = self.impalad.impala.connect(impalad=self.impalad)
@@ -1157,8 +1251,7 @@ class QueryRunner(object):
       if (not started_running_or_cancelled and query_state not in ('PENDING_STATE',
                                                       'INITIALIZED_STATE')):
         started_running_or_cancelled = True
-        if self.stress_runner:
-          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+        increment(self._metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED])
       # Return if we're ready to fetch results (in the FINISHED state) or we are in
       # another terminal state like EXCEPTION.
       if query_state not in ('PENDING_STATE', 'INITIALIZED_STATE', 'RUNNING_STATE'):
@@ -1168,8 +1261,8 @@ class QueryRunner(object):
         if not should_cancel:
           fetch_and_set_profile(cursor, report)
         self._cancel(cursor, report)
-        if not started_running_or_cancelled and self.stress_runner:
-          self.stress_runner.increment_num_queries_started_running_or_cancelled()
+        if not started_running_or_cancelled:
+          increment(self._metrics[NUM_QUERIES_STARTED_RUNNING_OR_CANCELLED])
         return False
       if secs_since_log > 5:
         secs_since_log = 0
@@ -1177,6 +1270,20 @@ class QueryRunner(object):
       sleep(sleep_secs)
       secs_since_log += sleep_secs
 
+  def update_from_query_report(self, report):
+    LOG.debug("Updating runtime stats (Query Runner PID: {0})".format(self.proc.pid))
+    increment(self._metrics[NUM_QUERIES_FINISHED])
+    if report.not_enough_memory:
+      increment(self._metrics[NUM_QUERIES_EXCEEDED_MEM_LIMIT])
+    if report.ac_rejected:
+      increment(self._metrics[NUM_QUERIES_AC_REJECTED])
+    if report.ac_timedout:
+      increment(self._metrics[NUM_QUERIES_AC_TIMEDOUT])
+    if report.was_cancelled:
+      increment(self._metrics[NUM_QUERIES_CANCELLED])
+    if report.timed_out:
+      increment(self._metrics[NUM_QUERIES_TIMEDOUT])
+
   def _cancel(self, cursor, report):
     report.timed_out = True
 


[impala] 03/03: IMPALA-8222: disable per-query timeouts in stress test

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 57ce2bd6416008a7a270a56b8a3d6efe00d2f20a
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Feb 19 17:19:17 2019 -0800

    IMPALA-8222: disable per-query timeouts in stress test
    
    It is very hard to pick a timeout threshold for queries
    in the stress test that won't result in false positives,
    because the slowdown can be non-linear with the amount
    of load on the system.
    
    To avoid this problem this change simply disables the
    timeout for the stress test phase. The timeout logic
    is still used for query cancellation and generating
    random queries (since those may run for too long).
    
    Testing:
    Manually tested against my minicluster:
    * TPC-H binary search for one query
    * A short stress test run
    * Random query generation with a 1 second timeout.
    
    Change-Id: I2ee8b8ec63562031784c2a719869dce57bcafc0b
    Reviewed-on: http://gerrit.cloudera.org:8080/12531
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/stress/concurrent_select.py | 35 +++++++++++------------------------
 1 file changed, 11 insertions(+), 24 deletions(-)

diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index e352d00..67c5628 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -111,7 +111,6 @@ NUM_QUERIES_EXCEEDED_MEM_LIMIT = "num_queries_exceeded_mem_limit"
 NUM_QUERIES_AC_REJECTED = "num_queries_ac_rejected"
 NUM_QUERIES_AC_TIMEDOUT = "num_queries_ac_timedout"
 NUM_QUERIES_CANCELLED = "num_queries_cancelled"
-NUM_QUERIES_TIMEDOUT = "num_queries_timedout"
 NUM_RESULT_MISMATCHES = "num_result_mismatches"
 NUM_OTHER_ERRORS = "num_other_errors"
 
@@ -464,7 +463,7 @@ class StressRunner(object):
 
     self._status_headers = [
         "Done", "Active", "Executing", "Mem Lmt Ex", "AC Reject", "AC Timeout",
-        "Time Out", "Cancel", "Err", "Incorrect", "Next Qry Mem Lmt",
+        "Cancel", "Err", "Incorrect", "Next Qry Mem Lmt",
         "Tot Qry Mem Lmt", "Tracked Mem", "RSS Mem"]
 
     self._num_queries_to_run = None
@@ -801,10 +800,10 @@ class StressRunner(object):
         if should_cancel:
           timeout = randrange(1, max(int(solo_runtime), 2))
         else:
-          metrics = self._calc_total_runner_metrics()
-          timeout = solo_runtime * max(
-              10, metrics[NUM_QUERIES_SUBMITTED] - metrics[NUM_QUERIES_FINISHED])
-        report = query_runner.run_query(query, timeout, mem_limit,
+          # Let the query run as long as necessary - it is nearly impossible to pick a
+          # good value that won't have false positives under load - see IMPALA-8222.
+          timeout = maxint
+        report = query_runner.run_query(query, mem_limit, timeout_secs=timeout,
             should_cancel=should_cancel)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
@@ -899,8 +898,6 @@ class StressRunner(object):
         metrics[NUM_QUERIES_AC_REJECTED],
         # AC Timed Out
         metrics[NUM_QUERIES_AC_TIMEDOUT],
-        # Time Out
-        metrics[NUM_QUERIES_TIMEDOUT] - metrics[NUM_QUERIES_CANCELLED],
         # Cancel
         metrics[NUM_QUERIES_CANCELLED],
         # Err
@@ -932,10 +929,7 @@ class StressRunner(object):
 
   def _check_for_test_failure(self):
     metrics = self._calc_total_runner_metrics()
-    if (
-        metrics[NUM_OTHER_ERRORS] > 0 or metrics[NUM_RESULT_MISMATCHES] > 0 or
-        metrics[NUM_QUERIES_TIMEDOUT] - metrics[NUM_QUERIES_CANCELLED] > 0
-    ):
+    if metrics[NUM_OTHER_ERRORS] > 0 or metrics[NUM_RESULT_MISMATCHES] > 0:
       LOG.error("Failing the stress test due to unexpected errors, incorrect results, or "
                 "timed out queries. See the report line above for details.")
       self.print_duration()
@@ -1134,7 +1128,6 @@ class QueryRunner(object):
         NUM_QUERIES_AC_REJECTED: Value("i", 0),
         NUM_QUERIES_AC_TIMEDOUT: Value("i", 0),
         NUM_QUERIES_CANCELLED: Value("i", 0),
-        NUM_QUERIES_TIMEDOUT: Value("i", 0),
         NUM_RESULT_MISMATCHES: Value("i", 0),
         NUM_OTHER_ERRORS: Value("i", 0)}
 
@@ -1146,8 +1139,8 @@ class QueryRunner(object):
       self.impalad_conn.close()
       self.impalad_conn = None
 
-  def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
-                should_cancel=False, retain_profile=False):
+  def run_query(self, query, mem_limit_mb, run_set_up=False,
+                timeout_secs=maxint, should_cancel=False, retain_profile=False):
     """Run a query and return an execution report. If 'run_set_up' is True, set up sql
     will be executed before the main query. This should be the case during the binary
     search phase of the stress test.
@@ -1281,8 +1274,6 @@ class QueryRunner(object):
       increment(self._metrics[NUM_QUERIES_AC_TIMEDOUT])
     if report.was_cancelled:
       increment(self._metrics[NUM_QUERIES_CANCELLED])
-    if report.timed_out:
-      increment(self._metrics[NUM_QUERIES_TIMEDOUT])
 
   def _cancel(self, cursor, report):
     report.timed_out = True
@@ -1548,8 +1539,8 @@ def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
     reports_by_outcome = defaultdict(list)
     leading_outcome = None
     for remaining_samples in xrange(samples - 1, -1, -1):
-      report = runner.run_query(query, timeout_secs, mem_limit,
-                                run_set_up=True, retain_profile=True)
+      report = runner.run_query(query, mem_limit, run_set_up=True,
+          timeout_secs=timeout_secs, retain_profile=True)
       if report.timed_out:
         report.write_query_profile(
             os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
@@ -2156,7 +2147,7 @@ def main():
       " amount of memory to smaller queries as to the big ones.")
   parser.add_argument(
       "--timeout-multiplier", type=float, default=1.0,
-      help="Query timeouts will be multiplied by this value.")
+      help="Deprecated - has no effect.")
   parser.add_argument("--max-queries", type=int, default=100)
   parser.add_argument(
       "--reset-databases-before-binary-search", action="store_true",
@@ -2373,10 +2364,6 @@ def main():
       query.required_mem_mb_without_spilling += int(
           query.required_mem_mb_without_spilling * args.mem_limit_padding_pct / 100.0) + \
           args.mem_limit_padding_abs
-    if query.solo_runtime_secs_with_spilling:
-      query.solo_runtime_secs_with_spilling *= args.timeout_multiplier
-    if query.solo_runtime_secs_without_spilling:
-      query.solo_runtime_secs_without_spilling *= args.timeout_multiplier
 
     # Remove any queries that would use "too many" resources. This way a larger number
     # of queries will run concurrently.


[impala] 02/03: IMPALA-7450. Set thread name during refresh/load operations

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 540278d57f9d44917c47ea070169c084cdf6dd61
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Aug 14 22:54:42 2018 -0700

    IMPALA-7450. Set thread name during refresh/load operations
    
    This adds a small utility class for annotating the current thread's name
    during potentially long-running operations such as refresh/load. With
    this change, jstack output now includes useful thread names like:
    
    During startup:
      "main [invalidating metadata - 128/428 dbs complete]"
    
    While loading a fresh table:
      "pool-4-thread-12 [Loading metadata for: foo_db.foo_table] [Loading
       metadata for all partition(s) of foo_db.foo_table]"
    
    Pool refreshing metadata for a particular path:
      "pool-23-thread-5 [Refreshing file metadata for path:
       hdfs://nameservice1/path/to/partdir..."
    
    Tests: Verified the patch manually by jstacking a catalogd while performing
    some workload. Also added a simple unit test to verify the thread renaming
    behavior.
    
    Change-Id: Ic7c850d6bb2eedc375ee567c19eb17add335f60c
    Reviewed-on: http://gerrit.cloudera.org:8080/11228
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../impala/catalog/CatalogServiceCatalog.java      |  76 +++++-----
 .../java/org/apache/impala/catalog/HdfsTable.java  |  25 ++--
 .../org/apache/impala/catalog/TableLoader.java     |  14 +-
 .../apache/impala/util/ThreadNameAnnotator.java    |  62 ++++++++
 .../impala/util/ThreadNameAnnotatorTest.java       | 158 +++++++++++++++++++++
 5 files changed, 292 insertions(+), 43 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 89a0e3a..fee27dc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -83,10 +83,12 @@ import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
-import org.slf4j.Logger;
+import org.apache.impala.util.ThreadNameAnnotator;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
@@ -94,7 +96,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -357,28 +358,31 @@ public class CatalogServiceCatalog extends Catalog {
    * when the function returns. Returns false otherwise and no lock is held in this case.
    */
   public boolean tryLockTable(Table tbl) {
-    long begin = System.currentTimeMillis();
-    long end;
-    do {
-      versionLock_.writeLock().lock();
-      if (tbl.getLock().tryLock()) {
-        if (LOG.isTraceEnabled()) {
-          end = System.currentTimeMillis();
-          LOG.trace(String.format("Lock for table %s was acquired in %d msec",
-              tbl.getFullName(), end - begin));
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+        "Attempting to lock table " + tbl.getFullName())) {
+      long begin = System.currentTimeMillis();
+      long end;
+      do {
+        versionLock_.writeLock().lock();
+        if (tbl.getLock().tryLock()) {
+          if (LOG.isTraceEnabled()) {
+            end = System.currentTimeMillis();
+            LOG.trace(String.format("Lock for table %s was acquired in %d msec",
+                tbl.getFullName(), end - begin));
+          }
+          return true;
         }
-        return true;
-      }
-      versionLock_.writeLock().unlock();
-      try {
-        // Sleep to avoid spinning and allow other operations to make progress.
-        Thread.sleep(TBL_LOCK_RETRY_MS);
-      } catch (InterruptedException e) {
-        // ignore
-      }
-      end = System.currentTimeMillis();
-    } while (end - begin < TBL_LOCK_TIMEOUT_MS);
-    return false;
+        versionLock_.writeLock().unlock();
+        try {
+          // Sleep to avoid spinning and allow other operations to make progress.
+          Thread.sleep(TBL_LOCK_RETRY_MS);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+        end = System.currentTimeMillis();
+      } while (end - begin < TBL_LOCK_TIMEOUT_MS);
+      return false;
+    }
   }
 
   /**
@@ -1305,14 +1309,20 @@ public class CatalogServiceCatalog extends Catalog {
       Map<String, Db> newDbCache = new ConcurrentHashMap<String, Db>();
       List<TTableName> tblsToBackgroundLoad = new ArrayList<>();
       try (MetaStoreClient msClient = getMetaStoreClient()) {
-        for (String dbName: msClient.getHiveClient().getAllDatabases()) {
-          dbName = dbName.toLowerCase();
-          Db oldDb = oldDbCache.get(dbName);
-          Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
-              dbName, oldDb);
-          if (invalidatedDb == null) continue;
-          newDbCache.put(dbName, invalidatedDb.first);
-          tblsToBackgroundLoad.addAll(invalidatedDb.second);
+        List<String> allDbs = msClient.getHiveClient().getAllDatabases();
+        int numComplete = 0;
+        for (String dbName: allDbs) {
+          String annotation = String.format("invalidating metadata - %s/%s dbs complete",
+              numComplete++, allDbs.size());
+          try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
+            dbName = dbName.toLowerCase();
+            Db oldDb = oldDbCache.get(dbName);
+            Pair<Db, List<TTableName>> invalidatedDb = invalidateDb(msClient,
+                dbName, oldDb);
+            if (invalidatedDb == null) continue;
+            newDbCache.put(dbName, invalidatedDb.first);
+            tblsToBackgroundLoad.addAll(invalidatedDb.second);
+          }
         }
       }
       dbCache_.set(newDbCache);
@@ -2428,7 +2438,9 @@ public class CatalogServiceCatalog extends Catalog {
       // the timeout exception and the user can monitor the queue metric to see that it
       // is full, so the issue should be easy to diagnose.
       // TODO: Figure out if such a race is possible.
-      try {
+      try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
+            "Get Partial Catalog Object - " +
+            Catalog.toCatalogObjectKey(req.object_desc))) {
         return doGetPartialCatalogObject(req);
       } finally {
         partialObjectFetchAccess_.release();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 7a16b3d..1612b38 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -91,6 +91,7 @@ import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.TAccessLevelUtil;
 import org.apache.impala.util.TResultRowBuilder;
+import org.apache.impala.util.ThreadNameAnnotator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -338,14 +339,16 @@ public class HdfsTable extends Table implements FeFsTable {
 
     @Override
     public FileMetadataLoadStats call() throws IOException {
-      FileMetadataLoadStats loadingStats =
-          reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) :
-          resetAndLoadFileMetadata(hdfsPath_, partitionList_);
-      return loadingStats;
+      try (ThreadNameAnnotator tna = new ThreadNameAnnotator(debugString())) {
+        FileMetadataLoadStats loadingStats =
+            reuseFileMd_ ? refreshFileMetadata(hdfsPath_, partitionList_) :
+            resetAndLoadFileMetadata(hdfsPath_, partitionList_);
+        return loadingStats;
+      }
     }
 
-    public String debugString() {
-      String loadType = reuseFileMd_ ? "Refreshed" : "Loaded";
+    private String debugString() {
+      String loadType = reuseFileMd_ ? "Refreshing" : "Loading";
       return String.format("%s file metadata for path: %s", loadType,
           hdfsPath_.toString());
     }
@@ -560,7 +563,9 @@ public class HdfsTable extends Table implements FeFsTable {
    */
   private void refreshPartitionFileMetadata(HdfsPartition partition)
       throws CatalogException {
-    try {
+    String annotation = String.format("refreshing table %s partition %s",
+        getFullName(), partition.getPartitionName());
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
       Path partDir = partition.getLocationPath();
       // If there are no existing files in the partition, use the non-incremental path.
       boolean useExistingFds = partition.hasFileDescriptors();
@@ -1205,7 +1210,11 @@ public class HdfsTable extends Table implements FeFsTable {
       Set<String> partitionsToUpdate) throws TableLoadingException {
     final Timer.Context context =
         getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
-    try {
+    String annotation = String.format("%s metadata for %s partition(s) of %s.%s",
+        reuseMetadata ? "Reloading" : "Loading",
+        partitionsToUpdate == null ? "all" : String.valueOf(partitionsToUpdate.size()),
+        msTbl.getDbName(), msTbl.getTableName());
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation)) {
       // turn all exceptions into TableLoadingException
       msTable_ = msTbl;
       try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 2a1ee15..40638a1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -18,12 +18,16 @@
 package org.apache.impala.catalog;
 
 import java.util.EnumSet;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Stopwatch;
+
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.util.ThreadNameAnnotator;
 
 /**
  * Class that implements the logic for how a table's metadata should be loaded from
@@ -54,11 +58,14 @@ public class TableLoader {
    * an IncompleteTable will be returned that contains details on the error.
    */
   public Table load(Db db, String tblName) {
+    Stopwatch sw = new Stopwatch().start();
     String fullTblName = db.getName() + "." + tblName;
-    LOG.info("Loading metadata for: " + fullTblName);
+    String annotation = "Loading metadata for: " + fullTblName;
+    LOG.info(annotation);
     Table table;
     // turn all exceptions into TableLoadingException
-    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+    try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
+         MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       org.apache.hadoop.hive.metastore.api.Table msTbl = null;
       // All calls to getTable() need to be serialized due to HIVE-5457.
       synchronized (metastoreAccessLock_) {
@@ -94,7 +101,8 @@ public class TableLoader {
           "Failed to load metadata for table: " + fullTblName + ". Running " +
           "'invalidate metadata " + fullTblName + "' may resolve this problem.", e));
     }
-    LOG.info("Loaded metadata for: " + fullTblName);
+    LOG.info("Loaded metadata for: " + fullTblName + " (" +
+        sw.elapsedTime(TimeUnit.MILLISECONDS) + "ms)");
     return table;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/util/ThreadNameAnnotator.java b/fe/src/main/java/org/apache/impala/util/ThreadNameAnnotator.java
new file mode 100644
index 0000000..c235986
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/ThreadNameAnnotator.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * AutoCloseable implementation which modifies the current thread's name during
+ * a scope. This can be used in a try-with-resources block around long operations
+ * in order to make it easier for an operator to understand what's going on while
+ * reading a jstack. For example, when making calls to external services which might
+ * respond slowly, it may be useful to know more details of the higher-level operation
+ * that is blocked or which service is being waited upon.
+ *
+ * Intended to be used by the renamed thread itself to temporarily rename that
+ * thread. Used in lieu of renaming the thread at thread start or when obtaining
+ * a thread from the thread pool. Primarily for debugging as above. Is not a
+ * substitute for logging since the name is ephemeral and only available via jstack.
+ *
+ * Example usage:
+ * <code>
+ *   try (ThreadNameAnnotator tna = new ThreadNameAnnotator("downloading " + url)) {
+ *     doFetch(url);
+ *   }
+ * </code>
+ */
+public class ThreadNameAnnotator implements AutoCloseable {
+  private final Thread thr_;
+  private final String oldName_;
+  private final String newName_;
+
+  public ThreadNameAnnotator(String annotation) {
+    thr_ = Thread.currentThread();
+    oldName_ = thr_.getName();
+    newName_ = oldName_ + " [" + annotation + "]";
+    thr_.setName(newName_);
+  }
+
+  @Override
+  public void close() {
+    // Must be called in the renamed thread itself.
+    Preconditions.checkState(thr_ == Thread.currentThread());
+    // Only reset the thread name if it hasn't been changed by someone else in the
+    // meantime.
+    if (thr_.getName().equals(newName_)) thr_.setName(oldName_);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/util/ThreadNameAnnotatorTest.java b/fe/src/test/java/org/apache/impala/util/ThreadNameAnnotatorTest.java
new file mode 100644
index 0000000..65a1e3d
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/ThreadNameAnnotatorTest.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ThreadNameAnnotatorTest {
+
+  /**
+   * Simple synchronizer that forces two threads to work in
+   * lock-step by stepping from one state to the next.
+   * Each thread waits for a state, does something, sets the
+   * new state, and waits for the other thread to advance the
+   * state again.
+   */
+  private static class Stepper {
+    private String state_ = "<start>";
+
+    public synchronized void setState(String state) {
+      state_ = state;
+      notifyAll();
+    }
+
+    public synchronized void waitForState(String state) {
+      for (;;) {
+        if (state_.equals(state)) return;
+        try {
+          wait(15_000);
+        } catch (InterruptedException e) {
+          fail("Test timed out, likely test flow error");
+        }
+      }
+    }
+
+    public void advance(String newState, String waitFor) throws InterruptedException {
+      setState(newState);
+      waitForState(waitFor);
+    }
+  }
+
+  /**
+   * Thread which uses the thread annotator to rename itself and a stepper
+   * to synchronize with the test thread.
+   */
+  public static class AnnotatedThread extends Thread {
+    private final Stepper stepper_;
+
+    public AnnotatedThread(Stepper stepper) {
+      stepper_ = stepper;
+    }
+
+    public AnnotatedThread(String name, Stepper stepper) {
+      super(name);
+      stepper_ = stepper;
+    }
+
+    @Override
+    public void run() {
+      try {
+        stepper_.advance("before", "annotate");
+        try (ThreadNameAnnotator tna = new ThreadNameAnnotator("annotated")) {
+          stepper_.advance("annotated","continue");
+        }
+        stepper_.advance("after", "done");
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * Verify that the thread annotator works for the basic case.
+   */
+  @Test
+  public void testAnnotator() throws InterruptedException {
+    Stepper stepper = new Stepper();
+    // Thread with name "orig"
+    Thread thread = new AnnotatedThread("orig", stepper);
+    assertEquals("orig", thread.getName());
+    thread.start();
+    stepper.waitForState("before");
+    assertEquals("orig", thread.getName());
+    stepper.advance("annotate", "annotated");
+    // Annotator should have added our "annotated" annotation
+    assertEquals("orig [annotated]", thread.getName());
+    stepper.advance("continue", "after");
+    // Original thread name should be restored
+    assertEquals("orig", thread.getName());
+    stepper.setState("done");
+    thread.join();
+    assertEquals("orig", thread.getName());
+  }
+
+  /**
+   * Test that the annotator will not restore the original thread name
+   * if the thread name is changed out from under the annotator.
+   */
+  @Test
+  public void testExternalRename() throws InterruptedException {
+    Stepper stepper = new Stepper();
+    Thread thread = new AnnotatedThread("orig", stepper);
+    assertEquals("orig", thread.getName());
+    thread.start();
+    stepper.waitForState("before");
+    assertEquals("orig", thread.getName());
+    stepper.advance("annotate", "annotated");
+    // Thread is annotated as before
+    assertEquals("orig [annotated]", thread.getName());
+    // Thread name is changed out from under the annotator
+    thread.setName("gotcha");
+    stepper.advance("continue", "after");
+    // Annotator noticed the change, did not restore original name
+    assertEquals("gotcha", thread.getName());
+    stepper.setState("done");
+    thread.join();
+    assertEquals("gotcha", thread.getName());
+  }
+
+  /**
+   * Test the degenerate case that the original thread name is the one
+   * assigned by Java.
+   */
+  @Test
+  public void testDefaultName() throws InterruptedException {
+    Stepper stepper = new Stepper();
+    Thread thread = new AnnotatedThread(stepper);
+    String orig = thread.getName();
+    thread.start();
+    stepper.waitForState("before");
+    assertEquals(orig, thread.getName());
+    stepper.advance("annotate", "annotated");
+    assertEquals(orig + " [annotated]", thread.getName());
+    stepper.advance("continue", "after");
+    assertEquals(orig, thread.getName());
+    stepper.setState("done");
+    thread.join();
+    assertEquals(orig, thread.getName());
+  }
+}