You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/30 15:29:22 UTC

[2/2] impala git commit: IMPALA-7735: Expose queuing status in ExecSummary and impala-shell

IMPALA-7735: Expose queuing status in ExecSummary and impala-shell

This patch adds the queuing status, that is, whether the query was
queued and what was the latest queuing reason, to the ExecSummary.
Also added changes to allow impala-shell to expose this status by
pulling it out from the ExecSummary when either live_summary or
live_progress is set to true.

Testing:
Added custom cluster tests.

Change-Id: Ibde447b01559b9f0f3e970d4fa10f6ee4064bd49
Reviewed-on: http://gerrit.cloudera.org:8080/11816
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/932bf2dd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/932bf2dd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/932bf2dd

Branch: refs/heads/master
Commit: 932bf2dd8170563e7d82c8711a7b89f62bc2f6cb
Parents: bae27ed
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Oct 22 16:58:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Oct 30 02:02:37 2018 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc                 | 13 +++++
 common/thrift/ExecStats.thrift                  |  7 +++
 shell/impala_shell.py                           | 11 ++++-
 .../custom_cluster/test_admission_controller.py | 44 ++++++++---------
 tests/custom_cluster/test_shell_interactive.py  | 51 ++++++++++++++++++++
 5 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e7928f4..9fa05e9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -59,6 +59,7 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
@@ -738,6 +739,18 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
       if (request_state->operation_state() == TOperationState::PENDING_STATE) {
+        const string* admission_result = request_state->summary_profile()->GetInfoString(
+            AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
+        if (admission_result != nullptr) {
+          if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) {
+            result->__set_is_queued(true);
+            const string* queued_reason = request_state->summary_profile()->GetInfoString(
+                AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON);
+            if (queued_reason != nullptr) {
+              result->__set_queued_reason(*queued_reason);
+            }
+          }
+        }
         return Status::OK();
       } else if (request_state->GetCoordinator() != nullptr) {
         request_state->GetCoordinator()->GetTExecSummary(result);

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 68a8fd8..9c759e9 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -100,4 +100,11 @@ struct TExecSummary {
 
   // Optional record indicating the query progress
   6: optional TExecProgress progress
+
+  // Set to true if the query is currently queued by admission control.
+  7: optional bool is_queued
+
+  // Contains the latest queuing reason if the query is currently queued by admission
+  // control.
+  8: optional string queued_reason
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 9277071..4f09bd8 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -963,7 +963,16 @@ class ImpalaShell(object, cmd.Cmd):
     checkpoint = time.time()
     if checkpoint - self.last_summary > self.PROGRESS_UPDATE_INTERVAL:
       summary = self.imp_client.get_summary(self.last_query_handle)
-      if summary and summary.progress:
+      if not summary:
+        return
+
+      if summary.is_queued:
+        queued_msg = "Query queued. Latest queuing reason: %s\n" % summary.queued_reason
+        self.progress_stream.write(queued_msg)
+        self.last_summary = time.time()
+        return
+
+      if summary.progress:
         progress = summary.progress
 
         # If the data is not complete return and wait for a good result.

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 018f66d..6b3100f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -824,41 +824,39 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
           pool_max_mem=1024 * 1024 * 1024))
   @needs_session()
-  def test_hs2_admission_controller_logs(self):
-    """Test to verify that the GetLog() function invoked by the HS2 client returns the
-    reason for queuing of the query."""
+  def test_queuing_status_through_query_log_and_exec_summary(self):
+    """Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
+    the query's queuing status, that is, whether the query was queued and what was the
+    latest queuing reason."""
     # Start a long running query.
-    long_query_req = TCLIService.TExecuteStatementReq()
-    long_query_req.sessionHandle = self.session_handle
-    long_query_req.statement = "select sleep(1000000)"
-    long_query_resp = self.hs2_client.ExecuteStatement(long_query_req)
-    HS2TestSuite.check_response(long_query_resp)
-    # Ensure that the query is running.
+    long_query_resp = self.execute_statement("select sleep(10000)")
+    # Ensure that the query has started executing.
     self.wait_for_admission_control(long_query_resp.operationHandle)
     # Submit another query.
-    execute_statement_req = TCLIService.TExecuteStatementReq()
-    execute_statement_req.sessionHandle = self.session_handle
-    execute_statement_req.statement = "select 1"
-    execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-    HS2TestSuite.check_response(execute_statement_resp)
+    queued_query_resp = self.execute_statement("select 1")
     # Wait until the query is queued.
-    self.wait_for_operation_state(execute_statement_resp.operationHandle,
+    self.wait_for_operation_state(queued_query_resp.operationHandle,
             TCLIService.TOperationState.PENDING_STATE)
-    # Ensure that the log message contains the queuing reason.
+    # Check whether the query log message correctly exposes the queuing status.
     get_log_req = TCLIService.TGetLogReq()
-    get_log_req.operationHandle = execute_statement_resp.operationHandle
+    get_log_req.operationHandle = queued_query_resp.operationHandle
     log = self.hs2_client.GetLog(get_log_req).log
     assert "Admission result : Queued" in log, log
     assert "Latest admission queue reason : number of running queries 1 is at or over "
     "limit 1" in log, log
+    # Now check the same for ExecSummary.
+    summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
+    summary_req.operationHandle = queued_query_resp.operationHandle
+    summary_req.sessionHandle = self.session_handle
+    exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
+    assert exec_summary_resp.summary.is_queued
+    assert "number of running queries 1 is at or over limit 1" in \
+           exec_summary_resp.summary.queued_reason,\
+      exec_summary_resp.summary.queued_reason
     # Close the running query.
-    close_operation_req = TCLIService.TCloseOperationReq()
-    close_operation_req.operationHandle = long_query_resp.operationHandle
-    HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
+    self.close(long_query_resp.operationHandle)
     # Close the queued query.
-    close_operation_req = TCLIService.TCloseOperationReq()
-    close_operation_req.operationHandle = execute_statement_resp.operationHandle
-    HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
+    self.close(queued_query_resp.operationHandle)
 
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):

http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/tests/custom_cluster/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py
new file mode 100644
index 0000000..9fde141
--- /dev/null
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import pexpect
+import os
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+
+
+class TestShellInteractive(CustomClusterTestSuite):
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests 1")
+  def test_admission_status(self):
+    """Test whether the admission status gets printed if a query gets queued when
+    either live_summary or live_progress is set to true"""
+    expected_admission_status = "Query queued. Latest queuing reason: " \
+                                "number of running queries 1 is at or over limit 1"
+    # Start a long running query so that the next one gets queued.
+    sleep_query_handle = self.client.execute_async("select sleep(10000)")
+    self.client.wait_for_admission_control(sleep_query_handle)
+    proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+    # Check with only live_summary set to true.
+    proc.expect("21000] default>")
+    proc.sendline("set live_summary=true;")
+    proc.sendline("select 1;")
+    proc.expect(expected_admission_status)
+    proc.sendcontrol('c')
+    proc.expect("Cancelling Query")
+    # Check with only live_progress set to true.
+    proc.sendline("set live_summary=false;")
+    proc.sendline("set live_progress=true;")
+    proc.sendline("select 1;")
+    proc.expect(expected_admission_status)