You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/30 15:29:21 UTC

[1/2] impala git commit: IMPALA-7166: ExecSummary should be a first class object.

Repository: impala
Updated Branches:
  refs/heads/master 44e69e818 -> 932bf2dd8


IMPALA-7166: ExecSummary should be a first class object.

Impala RuntimeProfile currently contains "ExecSummary" as a string. We should make it a
first class thrift object, so that tools can extract these fields (Est rows etc..).

Testing:
Modified unit test.

Change-Id: I4791237a5579f16c9efda8e57876d48980739e13
Reviewed-on: http://gerrit.cloudera.org:8080/11555
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bae27edf
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bae27edf
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bae27edf

Branch: refs/heads/master
Commit: bae27edf532d4e29ad8a83bf2ddd3b1b43f8a23f
Parents: 44e69e8
Author: Yongjun Zhang <yz...@cloudera.com>
Authored: Mon Oct 1 12:23:22 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Oct 30 01:11:30 2018 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc     |  1 +
 be/src/util/runtime-profile-test.cc | 14 ++++++++++++--
 be/src/util/runtime-profile.cc      | 19 ++++++++++++++++++-
 be/src/util/runtime-profile.h       | 15 +++++++++++++++
 common/thrift/RuntimeProfile.thrift |  2 ++
 5 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bae27edf/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index af9414c..e7928f4 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1167,6 +1167,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
   if (request_state->GetCoordinator() != nullptr) {
     TExecSummary t_exec_summary;
     request_state->GetCoordinator()->GetTExecSummary(&t_exec_summary);
+    request_state->summary_profile()->SetTExecSummary(t_exec_summary);
     string exec_summary = PrintExecSummary(t_exec_summary);
     request_state->summary_profile()->AddInfoStringRedacted("ExecSummary", exec_summary);
     request_state->summary_profile()->AddInfoStringRedacted("Errors",

http://git-wip-us.apache.org/repos/asf/impala/blob/bae27edf/be/src/util/runtime-profile-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 3a49042..4a54f0a 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -43,7 +43,7 @@ TEST(CountersTest, Basic) {
   profile_a->AddChild(profile_a2);
 
   // Test Empty
-  profile_a->ToThrift(&thrift_profile.nodes);
+  profile_a->ToThrift(&thrift_profile);
   EXPECT_EQ(thrift_profile.nodes.size(), 3);
   thrift_profile.nodes.clear();
 
@@ -63,12 +63,22 @@ TEST(CountersTest, Basic) {
   counter_b = profile_a2->AddCounter("B", TUnit::BYTES);
   EXPECT_TRUE(counter_b != NULL);
 
+  // Update status to be included in ExecSummary
+  TExecSummary exec_summary;
+  TStatus status;
+  status.__set_status_code(TErrorCode::CANCELLED);
+  exec_summary.__set_status(status);
+  profile_a->SetTExecSummary(exec_summary);
+
   // Serialize/deserialize
-  profile_a->ToThrift(&thrift_profile.nodes);
+  profile_a->ToThrift(&thrift_profile);
   RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile);
   counter_merged = from_thrift->GetCounter("A");
   EXPECT_EQ(counter_merged->value(), 1);
   EXPECT_TRUE(from_thrift->GetCounter("Not there") ==  NULL);
+  TExecSummary exec_summary_result;
+  from_thrift->GetExecSummary(&exec_summary_result);
+  EXPECT_EQ(exec_summary_result.status, status);
 
   // Averaged
   RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true);

http://git-wip-us.apache.org/repos/asf/impala/blob/bae27edf/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index dd83ee6..d7eba44 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -111,7 +111,9 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
     const TRuntimeProfileTree& profiles) {
   if (profiles.nodes.size() == 0) return NULL;
   int idx = 0;
-  return RuntimeProfile::CreateFromThrift(pool, profiles.nodes, &idx);
+  RuntimeProfile* profile = RuntimeProfile::CreateFromThrift(pool, profiles.nodes, &idx);
+  profile->SetTExecSummary(profiles.exec_summary);
+  return profile;
 }
 
 RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
@@ -848,9 +850,15 @@ Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
   return Status::OK();;
 }
 
+void RuntimeProfile::SetTExecSummary(const TExecSummary& summary) {
+  lock_guard<SpinLock> l(t_exec_summary_lock_);
+  t_exec_summary_ = summary;
+}
+
 void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {
   tree->nodes.clear();
   ToThrift(&tree->nodes);
+  ExecSummaryToThrift(tree);
 }
 
 void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
@@ -939,6 +947,15 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const {
   }
 }
 
+void RuntimeProfile::ExecSummaryToThrift(TRuntimeProfileTree* tree) const {
+  GetExecSummary(&tree->exec_summary);
+}
+
+void RuntimeProfile::GetExecSummary(TExecSummary* t_exec_summary) const {
+  lock_guard<SpinLock> l(t_exec_summary_lock_);
+  *t_exec_summary = t_exec_summary_;
+}
+
 int64_t RuntimeProfile::UnitsPerSecond(
     const RuntimeProfile::Counter* total_counter,
     const RuntimeProfile::Counter* timer) {

http://git-wip-us.apache.org/repos/asf/impala/blob/bae27edf/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index e0048d2..9408012 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -365,6 +365,12 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// This function updates local_time_percent_ for each profile.
   void ComputeTimeInProfile();
 
+  /// Set ExecSummary
+  void SetTExecSummary(const TExecSummary& summary);
+
+  /// Get a copy of exec_summary tp t_exec_summary
+  void GetExecSummary(TExecSummary* t_exec_summary) const;
+
  private:
   /// Pool for allocated counters. Usually owned by the creator of this
   /// object, but occasionally allocated in the constructor.
@@ -464,6 +470,12 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// ComputeTimeInProfile()
   int64_t local_time_ns_;
 
+  /// The Exec Summary
+  TExecSummary t_exec_summary_;
+
+  /// Protects exec_summary.
+  mutable SpinLock t_exec_summary_lock_;
+
   /// Constructor used by Create().
   RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile);
 
@@ -483,6 +495,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   void AddInfoStringInternal(
       const std::string& key, std::string value, bool append, bool redact = false);
 
+  /// Send exec_summary to thrift
+  void ExecSummaryToThrift(TRuntimeProfileTree* tree) const;
+
   /// Name of the counter maintaining the total time.
   static const std::string TOTAL_TIME_COUNTER_NAME;
   static const std::string LOCAL_TIME_COUNTER_NAME;

http://git-wip-us.apache.org/repos/asf/impala/blob/bae27edf/common/thrift/RuntimeProfile.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/RuntimeProfile.thrift b/common/thrift/RuntimeProfile.thrift
index 8751b17..e7ad3c6 100644
--- a/common/thrift/RuntimeProfile.thrift
+++ b/common/thrift/RuntimeProfile.thrift
@@ -18,6 +18,7 @@
 namespace cpp impala
 namespace java org.apache.impala.thrift
 
+include "ExecStats.thrift"
 include "Metrics.thrift"
 
 // Counter data
@@ -99,4 +100,5 @@ struct TRuntimeProfileNode {
 // pre-order traversal
 struct TRuntimeProfileTree {
   1: required list<TRuntimeProfileNode> nodes
+  2: optional ExecStats.TExecSummary exec_summary
 }


[2/2] impala git commit: IMPALA-7735: Expose queuing status in ExecSummary and impala-shell

Posted by ta...@apache.org.
IMPALA-7735: Expose queuing status in ExecSummary and impala-shell

This patch adds the queuing status, that is, whether the query was
queued and what was the latest queuing reason, to the ExecSummary.
Also added changes to allow impala-shell to expose this status by
pulling it out from the ExecSummary when either live_summary or
live_progress is set to true.

Testing:
Added custom cluster tests.

Change-Id: Ibde447b01559b9f0f3e970d4fa10f6ee4064bd49
Reviewed-on: http://gerrit.cloudera.org:8080/11816
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/932bf2dd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/932bf2dd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/932bf2dd

Branch: refs/heads/master
Commit: 932bf2dd8170563e7d82c8711a7b89f62bc2f6cb
Parents: bae27ed
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Oct 22 16:58:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Oct 30 02:02:37 2018 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc                 | 13 +++++
 common/thrift/ExecStats.thrift                  |  7 +++
 shell/impala_shell.py                           | 11 ++++-
 .../custom_cluster/test_admission_controller.py | 44 ++++++++---------
 tests/custom_cluster/test_shell_interactive.py  | 51 ++++++++++++++++++++
 5 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e7928f4..9fa05e9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -59,6 +59,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
@@ -738,6 +739,18 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
       if (request_state->operation_state() == TOperationState::PENDING_STATE) {
+        const string* admission_result = request_state->summary_profile()->GetInfoString(
+            AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
+        if (admission_result != nullptr) {
+          if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) {
+            result->__set_is_queued(true);
+            const string* queued_reason = request_state->summary_profile()->GetInfoString(
+                AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON);
+            if (queued_reason != nullptr) {
+              result->__set_queued_reason(*queued_reason);
+            }
+          }
+        }
         return Status::OK();
       } else if (request_state->GetCoordinator() != nullptr) {
         request_state->GetCoordinator()->GetTExecSummary(result);

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 68a8fd8..9c759e9 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -100,4 +100,11 @@ struct TExecSummary {
 
   // Optional record indicating the query progress
   6: optional TExecProgress progress
+
+  // Set to true if the query is currently queued by admission control.
+  7: optional bool is_queued
+
+  // Contains the latest queuing reason if the query is currently queued by admission
+  // control.
+  8: optional string queued_reason
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 9277071..4f09bd8 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -963,7 +963,16 @@ class ImpalaShell(object, cmd.Cmd):
     checkpoint = time.time()
     if checkpoint - self.last_summary > self.PROGRESS_UPDATE_INTERVAL:
       summary = self.imp_client.get_summary(self.last_query_handle)
-      if summary and summary.progress:
+      if not summary:
+        return
+
+      if summary.is_queued:
+        queued_msg = "Query queued. Latest queuing reason: %s\n" % summary.queued_reason
+        self.progress_stream.write(queued_msg)
+        self.last_summary = time.time()
+        return
+
+      if summary.progress:
         progress = summary.progress
 
         # If the data is not complete return and wait for a good result.

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 018f66d..6b3100f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -824,41 +824,39 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
           pool_max_mem=1024 * 1024 * 1024))
   @needs_session()
-  def test_hs2_admission_controller_logs(self):
-    """Test to verify that the GetLog() function invoked by the HS2 client returns the
-    reason for queuing of the query."""
+  def test_queuing_status_through_query_log_and_exec_summary(self):
+    """Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
+    the query's queuing status, that is, whether the query was queued and what was the
+    latest queuing reason."""
     # Start a long running query.
-    long_query_req = TCLIService.TExecuteStatementReq()
-    long_query_req.sessionHandle = self.session_handle
-    long_query_req.statement = "select sleep(1000000)"
-    long_query_resp = self.hs2_client.ExecuteStatement(long_query_req)
-    HS2TestSuite.check_response(long_query_resp)
-    # Ensure that the query is running.
+    long_query_resp = self.execute_statement("select sleep(10000)")
+    # Ensure that the query has started executing.
     self.wait_for_admission_control(long_query_resp.operationHandle)
     # Submit another query.
-    execute_statement_req = TCLIService.TExecuteStatementReq()
-    execute_statement_req.sessionHandle = self.session_handle
-    execute_statement_req.statement = "select 1"
-    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
+    queued_query_resp = self.execute_statement("select 1")
     # Wait until the query is queued.
-    self.wait_for_operation_state(execute_statement_resp.operationHandle,
+    self.wait_for_operation_state(queued_query_resp.operationHandle,
             TCLIService.TOperationState.PENDING_STATE)
-    # Ensure that the log message contains the queuing reason.
+    # Check whether the query log message correctly exposes the queuing status.
     get_log_req = TCLIService.TGetLogReq()
-    get_log_req.operationHandle = execute_statement_resp.operationHandle
+    get_log_req.operationHandle = queued_query_resp.operationHandle
     log = self.hs2_client.GetLog(get_log_req).log
     assert "Admission result : Queued" in log, log
     assert "Latest admission queue reason : number of running queries 1 is at or over "
     "limit 1" in log, log
+    # Now check the same for ExecSummary.
+    summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
+    summary_req.operationHandle = queued_query_resp.operationHandle
+    summary_req.sessionHandle = self.session_handle
+    exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
+    assert exec_summary_resp.summary.is_queued
+    assert "number of running queries 1 is at or over limit 1" in \
+           exec_summary_resp.summary.queued_reason,\
+      exec_summary_resp.summary.queued_reason
     # Close the running query.
-    close_operation_req = TCLIService.TCloseOperationReq()
-    close_operation_req.operationHandle = long_query_resp.operationHandle
-    HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
+    self.close(long_query_resp.operationHandle)
     # Close the queued query.
-    close_operation_req = TCLIService.TCloseOperationReq()
-    close_operation_req.operationHandle = execute_statement_resp.operationHandle
-    HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
+    self.close(queued_query_resp.operationHandle)
 
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/tests/custom_cluster/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py
new file mode 100644
index 0000000..9fde141
--- /dev/null
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import pexpect
+import os
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+
+
+class TestShellInteractive(CustomClusterTestSuite):
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests 1")
+  def test_admission_status(self):
+    """Test whether the admission status gets printed if a query gets queued when
+    either live_summary or live_progress is set to true"""
+    expected_admission_status = "Query queued. Latest queuing reason: " \
+                                "number of running queries 1 is at or over limit 1"
+    # Start a long running query so that the next one gets queued.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+    # Check with only live_summary set to true.
+    proc.expect("21000] default>")
+    proc.sendline("set live_summary=true;")
+    proc.sendline("select 1;")
+    proc.expect(expected_admission_status)
+    proc.sendcontrol('c')
+    proc.expect("Cancelling Query")
+    # Check with only live_progress set to true.
+    proc.sendline("set live_summary=false;")
+    proc.sendline("set live_progress=true;")
+    proc.sendline("select 1;")
+    proc.expect(expected_admission_status)