You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/30 15:29:22 UTC
[2/2] impala git commit: IMPALA-7735: Expose queuing status in
ExecSummary and impala-shell
IMPALA-7735: Expose queuing status in ExecSummary and impala-shell
This patch adds the queuing status, that is, whether the query was
queued and what was the latest queuing reason, to the ExecSummary.
Also added changes to allow impala-shell to expose this status by
pulling it out from the ExecSummary when either live_summary or
live_progress is set to true.
Testing:
Added custom cluster tests.
Change-Id: Ibde447b01559b9f0f3e970d4fa10f6ee4064bd49
Reviewed-on: http://gerrit.cloudera.org:8080/11816
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/932bf2dd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/932bf2dd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/932bf2dd
Branch: refs/heads/master
Commit: 932bf2dd8170563e7d82c8711a7b89f62bc2f6cb
Parents: bae27ed
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Mon Oct 22 16:58:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Oct 30 02:02:37 2018 +0000
----------------------------------------------------------------------
be/src/service/impala-server.cc | 13 +++++
common/thrift/ExecStats.thrift | 7 +++
shell/impala_shell.py | 11 ++++-
.../custom_cluster/test_admission_controller.py | 44 ++++++++---------
tests/custom_cluster/test_shell_interactive.py | 51 ++++++++++++++++++++
5 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index e7928f4..9fa05e9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -59,6 +59,7 @@
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-controller.h"
#include "scheduling/scheduler.h"
#include "service/impala-http-handler.h"
#include "service/impala-internal-service.h"
@@ -738,6 +739,18 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
request_state->user_has_profile_access()));
if (request_state->operation_state() == TOperationState::PENDING_STATE) {
+ const string* admission_result = request_state->summary_profile()->GetInfoString(
+ AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT);
+ if (admission_result != nullptr) {
+ if (*admission_result == AdmissionController::PROFILE_INFO_VAL_QUEUED) {
+ result->__set_is_queued(true);
+ const string* queued_reason = request_state->summary_profile()->GetInfoString(
+ AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON);
+ if (queued_reason != nullptr) {
+ result->__set_queued_reason(*queued_reason);
+ }
+ }
+ }
return Status::OK();
} else if (request_state->GetCoordinator() != nullptr) {
request_state->GetCoordinator()->GetTExecSummary(result);
http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 68a8fd8..9c759e9 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -100,4 +100,11 @@ struct TExecSummary {
// Optional record indicating the query progress
6: optional TExecProgress progress
+
+ // Set to true if the query is currently queued by admission control.
+ 7: optional bool is_queued
+
+ // Contains the latest queuing reason if the query is currently queued by admission
+ // control.
+ 8: optional string queued_reason
}
http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 9277071..4f09bd8 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -963,7 +963,16 @@ class ImpalaShell(object, cmd.Cmd):
checkpoint = time.time()
if checkpoint - self.last_summary > self.PROGRESS_UPDATE_INTERVAL:
summary = self.imp_client.get_summary(self.last_query_handle)
- if summary and summary.progress:
+ if not summary:
+ return
+
+ if summary.is_queued:
+ queued_msg = "Query queued. Latest queuing reason: %s\n" % summary.queued_reason
+ self.progress_stream.write(queued_msg)
+ self.last_summary = time.time()
+ return
+
+ if summary.progress:
progress = summary.progress
# If the data is not complete return and wait for a good result.
http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 018f66d..6b3100f 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -824,41 +824,39 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
pool_max_mem=1024 * 1024 * 1024))
@needs_session()
- def test_hs2_admission_controller_logs(self):
- """Test to verify that the GetLog() function invoked by the HS2 client returns the
- reason for queuing of the query."""
+ def test_queuing_status_through_query_log_and_exec_summary(self):
+ """Test to verify that the HS2 client's GetLog() call and the ExecSummary expose
+ the query's queuing status, that is, whether the query was queued and what was the
+ latest queuing reason."""
# Start a long running query.
- long_query_req = TCLIService.TExecuteStatementReq()
- long_query_req.sessionHandle = self.session_handle
- long_query_req.statement = "select sleep(1000000)"
- long_query_resp = self.hs2_client.ExecuteStatement(long_query_req)
- HS2TestSuite.check_response(long_query_resp)
- # Ensure that the query is running.
+ long_query_resp = self.execute_statement("select sleep(10000)")
+ # Ensure that the query has started executing.
self.wait_for_admission_control(long_query_resp.operationHandle)
# Submit another query.
- execute_statement_req = TCLIService.TExecuteStatementReq()
- execute_statement_req.sessionHandle = self.session_handle
- execute_statement_req.statement = "select 1"
- execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
- HS2TestSuite.check_response(execute_statement_resp)
+ queued_query_resp = self.execute_statement("select 1")
# Wait until the query is queued.
- self.wait_for_operation_state(execute_statement_resp.operationHandle,
+ self.wait_for_operation_state(queued_query_resp.operationHandle,
TCLIService.TOperationState.PENDING_STATE)
- # Ensure that the log message contains the queuing reason.
+ # Check whether the query log message correctly exposes the queuing status.
get_log_req = TCLIService.TGetLogReq()
- get_log_req.operationHandle = execute_statement_resp.operationHandle
+ get_log_req.operationHandle = queued_query_resp.operationHandle
log = self.hs2_client.GetLog(get_log_req).log
assert "Admission result : Queued" in log, log
assert "Latest admission queue reason : number of running queries 1 is at or over "
"limit 1" in log, log
+ # Now check the same for ExecSummary.
+ summary_req = ImpalaHiveServer2Service.TGetExecSummaryReq()
+ summary_req.operationHandle = queued_query_resp.operationHandle
+ summary_req.sessionHandle = self.session_handle
+ exec_summary_resp = self.hs2_client.GetExecSummary(summary_req)
+ assert exec_summary_resp.summary.is_queued
+ assert "number of running queries 1 is at or over limit 1" in \
+ exec_summary_resp.summary.queued_reason,\
+ exec_summary_resp.summary.queued_reason
# Close the running query.
- close_operation_req = TCLIService.TCloseOperationReq()
- close_operation_req.operationHandle = long_query_resp.operationHandle
- HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
+ self.close(long_query_resp.operationHandle)
# Close the queued query.
- close_operation_req = TCLIService.TCloseOperationReq()
- close_operation_req.operationHandle = execute_statement_resp.operationHandle
- HS2TestSuite.check_response(self.hs2_client.CloseOperation(close_operation_req))
+ self.close(queued_query_resp.operationHandle)
class TestAdmissionControllerStress(TestAdmissionControllerBase):
http://git-wip-us.apache.org/repos/asf/impala/blob/932bf2dd/tests/custom_cluster/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py
new file mode 100644
index 0000000..9fde141
--- /dev/null
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import pexpect
+import os
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+
+
+class TestShellInteractive(CustomClusterTestSuite):
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests 1")
+ def test_admission_status(self):
+ """Test whether the admission status gets printed if a query gets queued when
+ either live_summary or live_progress is set to true"""
+ expected_admission_status = "Query queued. Latest queuing reason: " \
+ "number of running queries 1 is at or over limit 1"
+ # Start a long running query so that the next one gets queued.
+ sleep_query_handle = self.client.execute_async("select sleep(10000)")
+ self.client.wait_for_admission_control(sleep_query_handle)
+ proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+ # Check with only live_summary set to true.
+ proc.expect("21000] default>")
+ proc.sendline("set live_summary=true;")
+ proc.sendline("select 1;")
+ proc.expect(expected_admission_status)
+ proc.sendcontrol('c')
+ proc.expect("Cancelling Query")
+ # Check with only live_progress set to true.
+ proc.sendline("set live_summary=false;")
+ proc.sendline("set live_progress=true;")
+ proc.sendline("select 1;")
+ proc.expect(expected_admission_status)