You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/09 01:20:40 UTC

[2/3] impala git commit: IMPALA-6227: more logging in test_admission_controller

IMPALA-6227: more logging in test_admission_controller

To enable debugging the occasional flakiness, lets log what each query
is doing by default.

Change-Id: Icbb58a9be4a5d023c1ee3fd76e5992dfba03188c
Reviewed-on: http://gerrit.cloudera.org:8080/9555
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/35d45947
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/35d45947
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/35d45947

Branch: refs/heads/master
Commit: 35d459479edfda3601afa0ed2e9985de58bedbe5
Parents: 778823c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 7 17:12:13 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Mar 9 00:00:18 2018 +0000

----------------------------------------------------------------------
 .../custom_cluster/test_admission_controller.py | 38 ++++++++++----------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/35d45947/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index bed6994..e5b4f99 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -159,8 +159,8 @@ def impalad_admission_ctrl_config_args(additional_args=""):
         "-llama_site_path %s -disable_admission_control=false %s" %\
         (fs_allocation_path, llama_site_path, additional_args))
 
-def log_metrics(log_prefix, metrics, log_level=logging.DEBUG):
-  LOG.log(log_level, "%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
+def log_metrics(log_prefix, metrics):
+  LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
       "released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
       metrics['dequeued'], metrics['rejected'], metrics['released'],
       metrics['timed-out'])
@@ -542,10 +542,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       log_metrics("wait_for_metric_changes, current=", current)
       deltas = compute_metric_deltas(current, initial)
       delta_sum = sum([ deltas[x] for x in metric_names ])
-      LOG.debug("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
+      LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
           delta_sum, deltas, expected_delta, metric_names)
       if delta_sum >= expected_delta:
-        LOG.debug("Found all %s metrics after %s seconds", delta_sum,
+        LOG.info("Found all %s metrics after %s seconds", delta_sum,
             round(time() - start_time, 1))
         return (deltas, current)
       assert (time() - start_time < STRESS_TIMEOUT),\
@@ -575,7 +575,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
       sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
-    LOG.debug("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats)
+    LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats)
 
   def wait_for_admitted_threads(self, num_threads):
     """
@@ -585,7 +585,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     to self.executing_threads.
     """
     start_time = time()
-    LOG.debug("Waiting for %s threads to begin execution", num_threads)
+    LOG.info("Waiting for %s threads to begin execution", num_threads)
     # All individual list operations are thread-safe, so we don't need to use a
     # lock to synchronize before checking the list length (on which another thread
     # may call append() concurrently).
@@ -594,7 +594,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
           "%s admitted client rpcs to return. Only %s executing " % (
           STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
       sleep(0.1)
-    LOG.debug("Found all %s admitted threads after %s seconds", num_threads,
+    LOG.info("Found all %s admitted threads after %s seconds", num_threads,
         round(time() - start_time, 1))
 
   def end_admitted_queries(self, num_queries):
@@ -602,14 +602,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     Requests each admitted query to end its query.
     """
     assert len(self.executing_threads) >= num_queries
-    LOG.debug("Requesting {0} clients to end queries".format(num_queries))
+    LOG.info("Requesting {0} clients to end queries".format(num_queries))
 
     # Request admitted clients to end their queries
     current_executing_queries = []
     for i in xrange(num_queries):
       # pop() is thread-safe, it's OK if another thread is appending concurrently.
       thread = self.executing_threads.pop(0)
-      LOG.debug("Cancelling query %s", thread.query_num)
+      LOG.info("Cancelling query %s", thread.query_num)
       assert thread.query_state == 'ADMITTED'
       current_executing_queries.append(thread)
       thread.query_state = 'REQUEST_QUERY_END'
@@ -673,22 +673,22 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
           if self.query_end_behavior == 'QUERY_TIMEOUT':
             client.execute("SET QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S))
 
-          LOG.debug("Submitting query %s", self.query_num)
+          LOG.info("Submitting query %s", self.query_num)
           self.query_handle = client.execute_async(query)
         except ImpalaBeeswaxException as e:
           if re.search("Rejected.*queue full", str(e)):
-            LOG.debug("Rejected query %s", self.query_num)
+            LOG.info("Rejected query %s", self.query_num)
             self.query_state = 'REJECTED'
             return
           elif "exceeded timeout" in str(e):
-            LOG.debug("Query %s timed out", self.query_num)
+            LOG.info("Query %s timed out", self.query_num)
             self.query_state = 'TIMED OUT'
             return
           else:
             raise e
         finally:
           self.lock.release()
-        LOG.debug("Admitted query %s", self.query_num)
+        LOG.info("Admitted query %s", self.query_num)
         self.query_state = 'ADMITTED'
         # The thread becomes visible to the main thread when it is added to the
         # shared list of executing_threads. append() is atomic and thread-safe.
@@ -716,14 +716,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
         self.error = e
         self.query_state = 'ERROR'
       finally:
-        LOG.debug("Thread terminating in state=%s", self.query_state)
+        LOG.info("Thread terminating in state=%s", self.query_state)
         if client is not None:
           client.close()
 
     def _end_query(self, client, query):
       """Bring the query to the appropriate end state defined by self.query_end_behaviour.
       Returns once the query has reached that state."""
-      LOG.debug("Ending query %s by %s",
+      LOG.info("Ending query %s by %s",
           str(self.query_handle.get_handle()), self.query_end_behavior)
       if self.query_end_behavior == 'QUERY_TIMEOUT':
         # Sleep and wait for the query to be cancelled. The cancellation will
@@ -771,7 +771,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     return num_queued
 
   def run_admission_test(self, vector, additional_query_options):
-    LOG.debug("Starting test case with parameters: %s", vector)
+    LOG.info("Starting test case with parameters: %s", vector)
     self.impalads = self.cluster.impalads
     round_robin_submission = vector.get_value('round_robin_submission')
     submission_delay_ms = vector.get_value('submission_delay_ms')
@@ -798,7 +798,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
     # ends the query. This prevents queued queries from being dequeued in the background
     # without this thread explicitly ending them, so that the test can admit queries in
     # discrete waves.
-    LOG.debug("Wait for initial admission decisions")
+    LOG.info("Wait for initial admission decisions")
     (metric_deltas, curr_metrics) = self.wait_for_metric_changes(\
         ['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
     # Also wait for the test threads that submitted the queries to start executing.
@@ -837,7 +837,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       curr_metrics = self.get_admission_metrics();
       log_metrics("Main loop, curr_metrics: ", curr_metrics);
       num_to_end = len(self.executing_threads)
-      LOG.debug("Main loop, will request %s queries to end", num_to_end)
+      LOG.info("Main loop, will request %s queries to end", num_to_end)
       self.end_admitted_queries(num_to_end)
       self.wait_for_metric_changes(['released'], curr_metrics, num_to_end)
 
@@ -857,7 +857,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       self.wait_for_statestore_updates(10)
 
     final_metrics = self.get_admission_metrics();
-    log_metrics("Final metrics: ", final_metrics, logging.INFO);
+    log_metrics("Final metrics: ", final_metrics);
     metric_deltas = compute_metric_deltas(final_metrics, initial_metrics)
     assert metric_deltas['timed-out'] == 0