You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/09 01:20:40 UTC
[2/3] impala git commit: IMPALA-6227: more logging in
test_admission_controller
IMPALA-6227: more logging in test_admission_controller
To enable debugging the occasional flakiness, lets log what each query
is doing by default.
Change-Id: Icbb58a9be4a5d023c1ee3fd76e5992dfba03188c
Reviewed-on: http://gerrit.cloudera.org:8080/9555
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/35d45947
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/35d45947
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/35d45947
Branch: refs/heads/master
Commit: 35d459479edfda3601afa0ed2e9985de58bedbe5
Parents: 778823c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 7 17:12:13 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Mar 9 00:00:18 2018 +0000
----------------------------------------------------------------------
.../custom_cluster/test_admission_controller.py | 38 ++++++++++----------
1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/35d45947/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index bed6994..e5b4f99 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -159,8 +159,8 @@ def impalad_admission_ctrl_config_args(additional_args=""):
"-llama_site_path %s -disable_admission_control=false %s" %\
(fs_allocation_path, llama_site_path, additional_args))
-def log_metrics(log_prefix, metrics, log_level=logging.DEBUG):
- LOG.log(log_level, "%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
+def log_metrics(log_prefix, metrics):
+ LOG.info("%sadmitted=%s, queued=%s, dequeued=%s, rejected=%s, "\
"released=%s, timed-out=%s", log_prefix, metrics['admitted'], metrics['queued'],
metrics['dequeued'], metrics['rejected'], metrics['released'],
metrics['timed-out'])
@@ -542,10 +542,10 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
log_metrics("wait_for_metric_changes, current=", current)
deltas = compute_metric_deltas(current, initial)
delta_sum = sum([ deltas[x] for x in metric_names ])
- LOG.debug("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
+ LOG.info("DeltaSum=%s Deltas=%s (Expected=%s for metrics=%s)",\
delta_sum, deltas, expected_delta, metric_names)
if delta_sum >= expected_delta:
- LOG.debug("Found all %s metrics after %s seconds", delta_sum,
+ LOG.info("Found all %s metrics after %s seconds", delta_sum,
round(time() - start_time, 1))
return (deltas, current)
assert (time() - start_time < STRESS_TIMEOUT),\
@@ -575,7 +575,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
assert (time() - start_time < STRESS_TIMEOUT),\
"Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))
- LOG.debug("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats)
+ LOG.info("Waited %s for %s heartbeats", round(time() - start_time, 1), heartbeats)
def wait_for_admitted_threads(self, num_threads):
"""
@@ -585,7 +585,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
to self.executing_threads.
"""
start_time = time()
- LOG.debug("Waiting for %s threads to begin execution", num_threads)
+ LOG.info("Waiting for %s threads to begin execution", num_threads)
# All individual list operations are thread-safe, so we don't need to use a
# lock to synchronize before checking the list length (on which another thread
# may call append() concurrently).
@@ -594,7 +594,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
"%s admitted client rpcs to return. Only %s executing " % (
STRESS_TIMEOUT, num_threads, len(self.executing_threads)))
sleep(0.1)
- LOG.debug("Found all %s admitted threads after %s seconds", num_threads,
+ LOG.info("Found all %s admitted threads after %s seconds", num_threads,
round(time() - start_time, 1))
def end_admitted_queries(self, num_queries):
@@ -602,14 +602,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
Requests each admitted query to end its query.
"""
assert len(self.executing_threads) >= num_queries
- LOG.debug("Requesting {0} clients to end queries".format(num_queries))
+ LOG.info("Requesting {0} clients to end queries".format(num_queries))
# Request admitted clients to end their queries
current_executing_queries = []
for i in xrange(num_queries):
# pop() is thread-safe, it's OK if another thread is appending concurrently.
thread = self.executing_threads.pop(0)
- LOG.debug("Cancelling query %s", thread.query_num)
+ LOG.info("Cancelling query %s", thread.query_num)
assert thread.query_state == 'ADMITTED'
current_executing_queries.append(thread)
thread.query_state = 'REQUEST_QUERY_END'
@@ -673,22 +673,22 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
if self.query_end_behavior == 'QUERY_TIMEOUT':
client.execute("SET QUERY_TIMEOUT_S={0}".format(QUERY_END_TIMEOUT_S))
- LOG.debug("Submitting query %s", self.query_num)
+ LOG.info("Submitting query %s", self.query_num)
self.query_handle = client.execute_async(query)
except ImpalaBeeswaxException as e:
if re.search("Rejected.*queue full", str(e)):
- LOG.debug("Rejected query %s", self.query_num)
+ LOG.info("Rejected query %s", self.query_num)
self.query_state = 'REJECTED'
return
elif "exceeded timeout" in str(e):
- LOG.debug("Query %s timed out", self.query_num)
+ LOG.info("Query %s timed out", self.query_num)
self.query_state = 'TIMED OUT'
return
else:
raise e
finally:
self.lock.release()
- LOG.debug("Admitted query %s", self.query_num)
+ LOG.info("Admitted query %s", self.query_num)
self.query_state = 'ADMITTED'
# The thread becomes visible to the main thread when it is added to the
# shared list of executing_threads. append() is atomic and thread-safe.
@@ -716,14 +716,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
self.error = e
self.query_state = 'ERROR'
finally:
- LOG.debug("Thread terminating in state=%s", self.query_state)
+ LOG.info("Thread terminating in state=%s", self.query_state)
if client is not None:
client.close()
def _end_query(self, client, query):
"""Bring the query to the appropriate end state defined by self.query_end_behaviour.
Returns once the query has reached that state."""
- LOG.debug("Ending query %s by %s",
+ LOG.info("Ending query %s by %s",
str(self.query_handle.get_handle()), self.query_end_behavior)
if self.query_end_behavior == 'QUERY_TIMEOUT':
# Sleep and wait for the query to be cancelled. The cancellation will
@@ -771,7 +771,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
return num_queued
def run_admission_test(self, vector, additional_query_options):
- LOG.debug("Starting test case with parameters: %s", vector)
+ LOG.info("Starting test case with parameters: %s", vector)
self.impalads = self.cluster.impalads
round_robin_submission = vector.get_value('round_robin_submission')
submission_delay_ms = vector.get_value('submission_delay_ms')
@@ -798,7 +798,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
# ends the query. This prevents queued queries from being dequeued in the background
# without this thread explicitly ending them, so that the test can admit queries in
# discrete waves.
- LOG.debug("Wait for initial admission decisions")
+ LOG.info("Wait for initial admission decisions")
(metric_deltas, curr_metrics) = self.wait_for_metric_changes(\
['admitted', 'queued', 'rejected'], initial_metrics, num_queries)
# Also wait for the test threads that submitted the queries to start executing.
@@ -837,7 +837,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
curr_metrics = self.get_admission_metrics();
log_metrics("Main loop, curr_metrics: ", curr_metrics);
num_to_end = len(self.executing_threads)
- LOG.debug("Main loop, will request %s queries to end", num_to_end)
+ LOG.info("Main loop, will request %s queries to end", num_to_end)
self.end_admitted_queries(num_to_end)
self.wait_for_metric_changes(['released'], curr_metrics, num_to_end)
@@ -857,7 +857,7 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
self.wait_for_statestore_updates(10)
final_metrics = self.get_admission_metrics();
- log_metrics("Final metrics: ", final_metrics, logging.INFO);
+ log_metrics("Final metrics: ", final_metrics);
metric_deltas = compute_metric_deltas(final_metrics, initial_metrics)
assert metric_deltas['timed-out'] == 0