You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/20 16:57:26 UTC

[3/3] impala git commit: IMPALA-6497: add "Last row fetched" and AC events

IMPALA-6497: add "Last row fetched" and AC events

This makes it more observable that all rows were returned to the client
and also that resources were released for admission control.

Testing:
Manually inspected some query profiles.

Added a basic observability test that ensures that the expected events
appear in the profile. Ran it in a loop for a bit to make sure it wasn't
flaky.

Change-Id: I32a707e4660061e75c86ad967f1fac6f6737da7e
Reviewed-on: http://gerrit.cloudera.org:8080/9271
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/96991ed8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/96991ed8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/96991ed8

Branch: refs/heads/2.x
Commit: 96991ed8f3f94994fc148b67dfc99cb21c77d92c
Parents: dd3b3db
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Feb 9 12:00:54 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 20 12:50:38 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc          |  4 ++-
 tests/query_test/test_observability.py | 56 ++++++++++++++++++++++++-----
 2 files changed, 50 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/96991ed8/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7973775..b69b848 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -863,6 +863,7 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 
   if (*eos) {
     returned_all_results_ = true;
+    query_events_->MarkEvent("Last row fetched");
     // release query execution resources here, since we won't be fetching more result rows
     ReleaseExecResources();
     // wait for all backends to complete before computing the summary
@@ -1087,12 +1088,13 @@ void Coordinator::ReleaseAdmissionControlResources() {
 
 void Coordinator::ReleaseAdmissionControlResourcesLocked() {
   if (released_admission_control_resources_) return;
-  LOG(INFO) << "Release admssion control resources for query "
+  LOG(INFO) << "Release admission control resources for query "
             << PrintId(query_ctx_.query_id);
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
   if (admission_controller != nullptr) admission_controller->ReleaseQuery(schedule_);
   released_admission_control_resources_ = true;
+  query_events_->MarkEvent("Released admission control resources");
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {

http://git-wip-us.apache.org/repos/asf/impala/blob/96991ed8/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 10ef6b2..58793af 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -20,6 +20,7 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.common.impala_cluster import ImpalaCluster
 import logging
 import pytest
+import re
 import time
 
 class TestObservability(ImpalaTestSuite):
@@ -184,16 +185,53 @@ class TestObservability(ImpalaTestSuite):
       query_id, MAX_WAIT)
     assert False, dbg_str
 
-  def test_query_profile_contains_instance_events(self, unique_database):
+  def test_query_profile_contains_query_events(self):
+    """Test that the expected events show up in a query profile."""
+    event_regexes = [r'Query Timeline:',
+        r'Query submitted:',
+        r'Planning finished:',
+        r'Submit for admission:',
+        r'Completed admission:',
+        r'Ready to start on .* backends:',
+        r'All .* execution backends \(.* fragment instances\) started:',
+        r'Rows available:',
+        r'First row fetched:',
+        r'Last row fetched:',
+        r'Released admission control resources:']
+    query = "select * from functional.alltypes"
+    runtime_profile = self.execute_query(query).runtime_profile
+    self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def test_query_profile_contains_instance_events(self):
     """Test that /query_profile_encoded contains an event timeline for fragment
     instances, even when there are errors."""
-    events = ["Fragment Instance Lifecycle Event Timeline",
-              "Prepare Finished",
-              "Open Finished",
-              "First Batch Produced",
-              "First Batch Sent",
-              "ExecInternal Finished"]
+    event_regexes = [r'Fragment Instance Lifecycle Event Timeline',
+                     r'Prepare Finished',
+                     r'Open Finished',
+                     r'First Batch Produced',
+                     r'First Batch Sent',
+                     r'ExecInternal Finished']
     query = "select count(*) from functional.alltypes"
     runtime_profile = self.execute_query(query).runtime_profile
-    for event in events:
-      assert event in runtime_profile
+    self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def __verify_profile_event_sequence(self, event_regexes, runtime_profile):
+    """Check that 'event_regexes' appear in a consecutive series of lines in
+       'runtime_profile'"""
+    lines = runtime_profile.splitlines()
+    event_regex_index = 0
+
+    # Check that the strings appear in the above order with no gaps in the profile.
+    for line in runtime_profile.splitlines():
+      match = re.search(event_regexes[event_regex_index], line)
+      if match is not None:
+        event_regex_index += 1
+        if event_regex_index == len(event_regexes):
+          # Found all the lines - we're done.
+          return
+      else:
+        # Haven't found the first regex yet.
+        assert event_regex_index == 0, \
+            event_regexes[event_regex_index] + " not in " + line + "\n" + runtime_profile
+    assert event_regex_index == len(event_regexes), \
+        "Didn't find all events in profile: \n" + runtime_profile