You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:09:40 UTC

[05/50] [abbrv] incubator-impala git commit: IMPALA-2198: Differentiate queries in exceptional states in web UI

IMPALA-2198: Differentiate queries in exceptional states in web UI

In order to make the query life-cycle clearer to users, added
a new section to the /queries webui page for queries that are
'waiting', not actively running either due to an error or to
returning all of their results, but that have not been closed so
they are still using resources.

This section is marked 'waiting to be closed' to indicate that they
still need to be closed even though they are not actively running.
These queries previously would have appeared in the 'in flight' list.
There is a tooltip with a full explanation.

The 'in_flight_queries' json endpoint was left as is, so that CM
will continue to work as expected, and filtering queries for the
different lists is done in the html template.

This was tested manually.

Change-Id: I47d0b642ecb573fefbbf337b8c8f2c479b0d49b2
Reviewed-on: http://gerrit.cloudera.org:8080/2625
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/68e8262c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/68e8262c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/68e8262c

Branch: refs/heads/master
Commit: 68e8262caf62aed1f514dc6f0b9f0a4a1ff9a435
Parents: 5ff8dc5
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Mar 1 13:05:01 2016 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 14:17:50 2016 -0700

----------------------------------------------------------------------
 be/src/service/impala-server-callbacks.cc     | 29 ++++++++
 be/src/service/impala-server.cc               |  2 +
 be/src/service/impala-server.h                |  6 ++
 tests/common/impala_service.py                |  4 +
 tests/custom_cluster/test_query_expiration.py | 14 ++++
 tests/hs2/test_json_endpoints.py              | 86 ++++++++++++++++++++++
 www/queries.tmpl                              | 48 +++++++++++-
 7 files changed, 188 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/be/src/service/impala-server-callbacks.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server-callbacks.cc b/be/src/service/impala-server-callbacks.cc
index 208264b..89555e4 100644
--- a/be/src/service/impala-server-callbacks.cc
+++ b/be/src/service/impala-server-callbacks.cc
@@ -26,6 +26,7 @@
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/redactor.h"
 #include "util/summary-util.h"
+#include "util/time.h"
 #include "util/url-coding.h"
 
 #include "common/names.h"
@@ -275,6 +276,20 @@ void ImpalaServer::QueryStateToJson(const ImpalaServer::QueryStateRecord& record
         document->GetAllocator());
     value->AddMember("last_event", last_event, document->GetAllocator());
   }
+
+  // Waiting to be closed.
+  bool waiting = record.query_state == beeswax::QueryState::EXCEPTION ||
+      record.all_rows_returned;
+  value->AddMember("waiting", waiting, document->GetAllocator());
+  value->AddMember("executing", !waiting, document->GetAllocator());
+
+  int64_t waiting_time = impala::UnixMillis() - record.last_active_time;
+  string waiting_time_str = "";
+  if (waiting_time > 0) {
+    waiting_time_str = PrettyPrinter::Print(waiting_time, TUnit::TIME_MS);
+  }
+  Value val_waiting_time(waiting_time_str.c_str(), document->GetAllocator());
+  value->AddMember("waiting_time", val_waiting_time, document->GetAllocator());
 }
 
 void ImpalaServer::QueryStateUrlCallback(const Webserver::ArgumentMap& args,
@@ -290,15 +305,29 @@ void ImpalaServer::QueryStateUrlCallback(const Webserver::ArgumentMap& args,
   }
 
   Value in_flight_queries(kArrayType);
+  int64_t num_waiting_queries = 0;
   BOOST_FOREACH(const QueryStateRecord& record, sorted_query_records) {
     Value record_json(kObjectType);
     QueryStateToJson(record, &record_json, document);
+
+    if (record_json["waiting"].GetBool()) ++num_waiting_queries;
+
     in_flight_queries.PushBack(record_json, document->GetAllocator());
   }
   document->AddMember("in_flight_queries", in_flight_queries, document->GetAllocator());
   document->AddMember("num_in_flight_queries",
       static_cast<uint64_t>(sorted_query_records.size()),
       document->GetAllocator());
+  document->AddMember("num_executing_queries",
+      sorted_query_records.size() - num_waiting_queries,
+      document->GetAllocator());
+  document->AddMember("num_waiting_queries", num_waiting_queries,
+      document->GetAllocator());
+  document->AddMember("waiting-tooltip", "These queries are no longer executing, either "
+      "because they encountered an error or because they have returned all of their "
+      "results, but they are still active so that their results can be inspected. To "
+      "free the resources they are using, they must be closed.",
+      document->GetAllocator());
 
   Value completed_queries(kArrayType);
   {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4ba9867..8c8ebcf 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1599,6 +1599,8 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat
 
   // Save the query fragments so that the plan can be visualised.
   fragments = exec_state.exec_request().query_exec_request.fragments;
+  all_rows_returned = exec_state.eos();
+  last_active_time = exec_state.last_active();
 }
 
 bool ImpalaServer::QueryStateRecord::operator() (

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 175cdba..339da4d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -606,6 +606,12 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
     /// webpages.
     vector<TPlanFragment> fragments;
 
+    // If true, this query has no more rows to return
+    bool all_rows_returned;
+
+    // The most recent time this query was actively being processed, in Unix milliseconds.
+    int64_t last_active_time;
+
     /// Initialise from an exec_state. If copy_profile is true, print the query
     /// profile to a string and copy that into this.profile (which is expensive),
     /// otherwise leave this.profile empty.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 17dc37c..13ac6a3 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -98,6 +98,10 @@ class ImpaladService(BaseImpalaService):
     num = len(result['backends'])
     return None if num is None else int(num)
 
+  def get_in_flight_queries(self, timeout=30, interval=1):
+    result = json.loads(self.read_debug_webpage('queries?json', timeout, interval))
+    return result['in_flight_queries']
+
   def get_num_in_flight_queries(self, timeout=30, interval=1):
     LOG.info("Getting num_in_flight_queries from %s:%s" %
         (self.hostname, self.webserver_port))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/tests/custom_cluster/test_query_expiration.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_query_expiration.py b/tests/custom_cluster/test_query_expiration.py
index e942b0c..20037cb 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -24,6 +24,16 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 class TestQueryExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
 
+  def _check_num_executing(self, impalad, expected):
+    in_flight_queries = impalad.service.get_in_flight_queries()
+    actual = 0
+    for query in in_flight_queries:
+      if query["executing"]:
+        actual += 1
+      else:
+        assert query["waiting"]
+    assert actual == expected
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=6")
   def test_query_expiration(self, vector):
@@ -38,6 +48,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
     # Set a huge timeout, to check that the server bounds it by --idle_query_timeout
     client.execute("SET QUERY_TIMEOUT_S=1000")
     handle3 = client.execute_async("SELECT SLEEP(3000000)")
+    self._check_num_executing(impalad, 3)
 
     before = time()
     sleep(4)
@@ -45,6 +56,7 @@ class TestQueryExpiration(CustomClusterTestSuite):
     # Query with timeout of 1 should have expired, other query should still be running.
     assert num_expired + 1 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
+    self._check_num_executing(impalad, 2)
     impalad.service.wait_for_metric_value('impala-server.num-queries-expired',
                                           num_expired + 3)
 
@@ -62,6 +74,8 @@ class TestQueryExpiration(CustomClusterTestSuite):
     # Confirm that no extra expirations happened
     assert impalad.service.get_metric_value('impala-server.num-queries-expired') \
         == num_expired + 3
+    self._check_num_executing(impalad, 0)
+
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=0")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/tests/hs2/test_json_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/hs2/test_json_endpoints.py b/tests/hs2/test_json_endpoints.py
new file mode 100644
index 0000000..637727c
--- /dev/null
+++ b/tests/hs2/test_json_endpoints.py
@@ -0,0 +1,86 @@
+# Copyright (c) 2016 Cloudera, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Tests for query expiration.
+
+import json
+import pytest
+
+from urllib2 import urlopen
+
+from tests.hs2.hs2_test_suite import HS2TestSuite
+from TCLIService import TCLIService
+
+class TestJsonEndpoints(HS2TestSuite):
+  def _get_json_queries(self, http_addr):
+    """Get the json output of the /queries page from the impalad web UI at http_addr."""
+    resp = urlopen("http://%s/queries?json" % http_addr)
+    assert resp.msg == 'OK'
+    return json.loads(resp.read())
+
+  @pytest.mark.execute_serially
+  def test_waiting_in_flight_queries(self):
+    """Confirm that the in_flight_queries endpoint shows a query at eos as waiting"""
+    open_session_req = TCLIService.TOpenSessionReq()
+    default_database = "functional"
+    open_session_req.configuration = {"use:database": default_database}
+    open_session_resp = self.hs2_client.OpenSession(open_session_req)
+    TestJsonEndpoints.check_response(open_session_resp)
+    http_addr = open_session_resp.configuration['http_addr']
+
+    # Execute a SELECT, and check that in_flight_queries shows one executing query.
+    select_statement_req = TCLIService.TExecuteStatementReq()
+    select_statement_req.sessionHandle = open_session_resp.sessionHandle
+    select_statement_req.statement = "SELECT * FROM functional.alltypes LIMIT 0"
+    select_statement_resp = self.hs2_client.ExecuteStatement(select_statement_req)
+    TestJsonEndpoints.check_response(select_statement_resp)
+    queries_json = self._get_json_queries(http_addr)
+    assert len(queries_json["in_flight_queries"]) == 1
+    assert queries_json["num_in_flight_queries"] == 1
+    assert queries_json["num_executing_queries"] == 1
+    assert queries_json["num_waiting_queries"] == 0
+    query = queries_json["in_flight_queries"][0]
+    assert query["default_db"] == default_database
+    assert query["stmt"] == select_statement_req.statement
+    assert query["stmt_type"] == "QUERY"
+    assert query["rows_fetched"] == 0
+    assert query["executing"]
+    assert not query["waiting"]
+
+    # Fetch the results, putting the query at eos, and check that in_flight_queries
+    # shows one waiting query.
+    fetch_results_req = TCLIService.TFetchResultsReq()
+    fetch_results_req.operationHandle = select_statement_resp.operationHandle
+    fetch_results_req.maxRows = 100
+    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
+    TestJsonEndpoints.check_response(fetch_results_resp)
+    queries_json = self._get_json_queries(http_addr)
+    assert len(queries_json["in_flight_queries"]) == 1
+    assert queries_json["num_in_flight_queries"] == 1
+    assert queries_json["num_executing_queries"] == 0
+    assert queries_json["num_waiting_queries"] == 1
+    query = queries_json["in_flight_queries"][0]
+    assert not query["executing"]
+    assert query["waiting"]
+
+    # Close the query and check that in_flight_queries is empty.
+    close_operation_req = TCLIService.TCloseOperationReq()
+    close_operation_req.operationHandle = select_statement_resp.operationHandle
+    close_operation_resp = self.hs2_client.CloseOperation(close_operation_req)
+    TestJsonEndpoints.check_response(close_operation_resp)
+    queries_json = self._get_json_queries(http_addr)
+    assert len(queries_json["in_flight_queries"]) == 0
+    assert queries_json["num_in_flight_queries"] == 0
+    assert queries_json["num_executing_queries"] == 0
+    assert queries_json["num_waiting_queries"] == 0

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/68e8262c/www/queries.tmpl
----------------------------------------------------------------------
diff --git a/www/queries.tmpl b/www/queries.tmpl
index 34a49fe..444e619 100644
--- a/www/queries.tmpl
+++ b/www/queries.tmpl
@@ -19,7 +19,7 @@ limitations under the License.
 archived in memory. The size of that archive is controlled with the
 <samp>--query_log_size</samp> command line parameter.</p>
 
-<h3>{{num_in_flight_queries}} queries in flight</h3>
+<h3>{{num_executing_queries}} queries in flight</h3>
 
 <table class='table table-hover table-border'>
   <tr>
@@ -36,7 +36,9 @@ archived in memory. The size of that archive is controlled with the
     <th>Details</th>
     <th>Action</th>
   </tr>
+{{! filter to get just executing queries from in_flight_queries}}
 {{#in_flight_queries}}
+{{?executing}}
   <tr>
     <td>{{effective_user}}</td>
     <td>{{default_db}}</td>
@@ -51,6 +53,50 @@ archived in memory. The size of that archive is controlled with the
     <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     <td><a href='/cancel_query?query_id={{query_id}}'>Cancel</a></td>
   </tr>
+{{/executing}}
+{{/in_flight_queries}}
+</table>
+
+<h3>
+  {{num_waiting_queries}} waiting to be closed
+  <sup><a href='#' data-toggle="tooltip" title="{{waiting-tooltip}}">[?]</a></sup>
+</h3>
+
+<table class='table table-hover table-border'>
+  <tr>
+    <th>User</th>
+    <th>Default Db</th>
+    <th>Statement</th>
+    <th>Query Type</th>
+    <th>Start Time</th>
+    <th>Waiting Time</th>
+    <th>Duration</th>
+    <th>Scan Progress</th>
+    <th>State</th>
+    <th>Last Event</th>
+    <th># rows fetched</th>
+    <th>Details</th>
+    <th>Action</th>
+  </tr>
+{{! filter to get just waiting queries from in_flight_queries}}
+{{#in_flight_queries}}
+{{?waiting}}
+  <tr>
+    <td>{{effective_user}}</td>
+    <td>{{default_db}}</td>
+    <td><samp>{{stmt}}</samp></td>
+    <td><samp>{{stmt_type}}</samp></td>
+    <td>{{start_time}}</td>
+    <td>{{waiting_time}}</td>
+    <td>{{duration}}</td>
+    <td>{{progress}}</td>
+    <td><samp>{{state}}</samp></td>
+    <td><samp>{{last_event}}</samp></td>
+    <td>{{rows_fetched}}</td>
+    <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
+    <td><a href='/cancel_query?query_id={{query_id}}'>Close</a></td>
+  </tr>
+{{/waiting}}
 {{/in_flight_queries}}
 </table>