You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/02/13 22:21:23 UTC
[impala] 05/05: IMPALA-9182: Print the socket address of the client
closing a session or cancelling a query from the WebUI
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit d96dab20dd663dfdcdcd2771d2f210cb02ecd710
Author: Vincent Tran <vt...@cloudera.com>
AuthorDate: Mon Feb 10 08:57:46 2020 -0800
IMPALA-9182: Print the socket address of the client closing a session or cancelling a query from the WebUI
This change appends the socket address (HOST:PORT) of the client
who made the request to close a session or cancel a query from
the coordinator's debug WebUI.
Existing statuses:
"Cancelled from Impala's debug web interface"
"Session closed from Impala's debug web interface"
New statuses:
"Cancelled from Impala's debug web interface by client at
<host>:<port>"
"Session closed from Impala's debug web interface by client
at <host>:<port>"
Testing:
-Verified visually that the status message is printed in the impalad
log with the socket address when one cancels a query or closes a session.
-Added a new e2e test to verify that the new status gets printed in
runtime profile and coordinator log when a query is cancelled in this
way.
-Made log asserts more robust by adding a timeout/wait value.
Change-Id: Icf74ad06ce1c40fab4ce37de6b7ca78e3e520b43
Reviewed-on: http://gerrit.cloudera.org:8080/14782
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/kudu/util/web_callback_registry.h | 3 ++
be/src/service/impala-http-handler.cc | 6 ++-
be/src/util/webserver.cc | 1 +
tests/common/impala_test_suite.py | 67 +++++++++++++++++++------------
tests/observability/test_log_fragments.py | 2 -
tests/webserver/test_web_pages.py | 28 ++++++++++++-
6 files changed, 77 insertions(+), 30 deletions(-)
diff --git a/be/src/kudu/util/web_callback_registry.h b/be/src/kudu/util/web_callback_registry.h
index 3b7ff13..d2e4ecc 100644
--- a/be/src/kudu/util/web_callback_registry.h
+++ b/be/src/kudu/util/web_callback_registry.h
@@ -65,6 +65,9 @@ class WebCallbackRegistry {
// In the case of a POST, the posted data.
std::string post_data;
+
+ // The socket address of the requester, <host>:<port>.
+ std::string source_socket;
};
// A response to an HTTP request whose body is rendered by template.
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 865a73b..48d94ce 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -212,7 +212,8 @@ void ImpalaHttpHandler::CancelQueryHandler(const Webserver::WebRequest& req,
document->AddMember("error", error, document->GetAllocator());
return;
}
- Status cause("Cancelled from Impala's debug web interface");
+ Status cause(Substitute("Cancelled from Impala's debug web interface by client at $0"
+ , req.source_socket));
// Web UI doesn't have access to secret so we can't validate it. We assume that
// web UI is allowed to close queries.
status = server_->UnregisterQuery(unique_id, true, &cause);
@@ -234,7 +235,8 @@ void ImpalaHttpHandler::CloseSessionHandler(const Webserver::WebRequest& req,
document->AddMember("error", error, document->GetAllocator());
return;
}
- Status cause("Session closed from Impala's debug web interface");
+ Status cause(Substitute("Session closed from Impala's debug web interface by client at"
+ " $0", req.source_socket));
// Web UI doesn't have access to secret so we can't validate it. We assume that
// web UI is allowed to close sessions.
status = server_->CloseSessionInternal(unique_id,
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 65f7555..c3dc7b7 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -580,6 +580,7 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
}
WebRequest req;
+ req.source_socket = GetRemoteAddress(request_info).ToString();
if (request_info->query_string != nullptr) {
req.query_string = request_info->query_string;
BuildArgumentMap(request_info->query_string, &req.parsed_args);
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 5127dcc..0ac21b4 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1147,44 +1147,61 @@ class ImpalaTestSuite(BaseTestSuite):
"Check failed to return True after {0} tries and {1} seconds{2}".format(
count, timeout_s, error_msg_str))
- def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
+ def assert_impalad_log_contains(self, level, line_regex, expected_count=1, timeout_s=6):
"""
Convenience wrapper around assert_log_contains for impalad logs.
"""
- self.assert_log_contains("impalad", level, line_regex, expected_count)
+ self.assert_log_contains("impalad", level, line_regex, expected_count, timeout_s)
- def assert_catalogd_log_contains(self, level, line_regex, expected_count=1):
+ def assert_catalogd_log_contains(self, level, line_regex, expected_count=1,
+ timeout_s=6):
"""
Convenience wrapper around assert_log_contains for catalogd logs.
"""
- self.assert_log_contains("catalogd", level, line_regex, expected_count)
+ self.assert_log_contains("catalogd", level, line_regex, expected_count, timeout_s)
- def assert_log_contains(self, daemon, level, line_regex, expected_count=1):
+ def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6):
"""
Assert that the daemon log with specified level (e.g. ERROR, WARNING, INFO) contains
expected_count lines with a substring matching the regex. When expected_count is -1,
at least one match is expected.
+ Retries until 'timeout_s' has expired. The default timeout is the default minicluster
+ log buffering time (5 seconds) with a one second buffer.
When using this method to check log files of running processes, the caller should
make sure that log buffering has been disabled, for example by adding
- '-logbuflevel=-1' to the daemon startup options.
+ '-logbuflevel=-1' to the daemon startup options or set timeout_s to a value higher
+ than the log flush interval.
"""
pattern = re.compile(line_regex)
- found = 0
- if hasattr(self, "impala_log_dir"):
- log_dir = self.impala_log_dir
- else:
- log_dir = EE_TEST_LOGS_DIR
- log_file_path = os.path.join(log_dir, daemon + "." + level)
- # Resolve symlinks to make finding the file easier.
- log_file_path = os.path.realpath(log_file_path)
- with open(log_file_path) as log_file:
- for line in log_file:
- if pattern.search(line):
- found += 1
- if expected_count == -1:
- assert found > 0, "Expected at least one line in file %s matching regex '%s'"\
- ", but found none." % (log_file_path, line_regex)
- else:
- assert found == expected_count, "Expected %d lines in file %s matching regex '%s'"\
- ", but found %d lines. Last line was: \n%s" %\
- (expected_count, log_file_path, line_regex, found, line)
+ start_time = time.time()
+ while True:
+ try:
+ found = 0
+ if hasattr(self, "impala_log_dir"):
+ log_dir = self.impala_log_dir
+ else:
+ log_dir = EE_TEST_LOGS_DIR
+ log_file_path = os.path.join(log_dir, daemon + "." + level)
+ # Resolve symlinks to make finding the file easier.
+ log_file_path = os.path.realpath(log_file_path)
+ with open(log_file_path) as log_file:
+ for line in log_file:
+ if pattern.search(line):
+ found += 1
+ if expected_count == -1:
+ assert found > 0, "Expected at least one line in file %s matching regex '%s'"\
+ ", but found none." % (log_file_path, line_regex)
+ else:
+ assert found == expected_count, \
+ "Expected %d lines in file %s matching regex '%s', but found %d lines. "\
+ "Last line was: \n%s" %\
+ (expected_count, log_file_path, line_regex, found, line)
+ return
+ except AssertionError as e:
+ # Re-throw the exception to the caller only when the timeout is expired. Otherwise
+ # sleep before retrying.
+ if time.time() - start_time > timeout_s:
+ raise
+ LOG.info("Expected log lines could not be found, sleeping before retrying: %s",
+ str(e))
+ time.sleep(1)
diff --git a/tests/observability/test_log_fragments.py b/tests/observability/test_log_fragments.py
index 81fbf53..f81d79e 100644
--- a/tests/observability/test_log_fragments.py
+++ b/tests/observability/test_log_fragments.py
@@ -40,8 +40,6 @@ class TestLogFragments(ImpalaTestSuite):
query_id = re.search("id=([0-9a-f]+:[0-9a-f]+)",
result.runtime_profile).groups()[0]
self.execute_query("select 1")
- # Logging may be buffered, so sleep to wait out the buffering.
- time.sleep(6)
self.assert_impalad_log_contains('INFO', query_id +
"] Analysis and authorization finished.")
assert query_id.endswith("000")
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 523ff12..e6cbfa6 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -17,7 +17,7 @@
from tests.common.environ import ImpalaTestClusterFlagsDetector
from tests.common.file_utils import grep_dir
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
import itertools
@@ -762,3 +762,29 @@ class TestWebPage(ImpalaTestSuite):
self.get_and_check_status(self.ROOT_URL,
"href='http://.*:%s/'" % self.IMPALAD_TEST_PORT[0], self.IMPALAD_TEST_PORT,
regex=True, headers={'X-Forwarded-Context': '/gateway'})
+
+ @SkipIfDockerizedCluster.daemon_logs_not_exposed
+ def test_display_src_socket_in_query_cause(self):
+ # Execute a long running query then cancel it from the WebUI.
+ # Check the runtime profile and the INFO logs for the cause message.
+ query = "select sleep(10000)"
+ query_id = self.execute_query_async(query).get_handle().id
+ cancel_query_url = "{0}cancel_query?query_id={1}".format(self.ROOT_URL.format
+ ("25000"), query_id)
+ text_profile_url = "{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL
+ .format("25000"), query_id)
+ requests.get(cancel_query_url)
+ response = requests.get(text_profile_url)
+ cancel_status = "Cancelled from Impala's debug web interface by client at"
+ assert cancel_status in response.text
+ self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug web "
+ "interface by client at", expected_count=-1)
+ # Session closing from the WebUI does not produce the cause message in the profile,
+ # so we will skip checking the runtime profile.
+ results = self.execute_query("select current_session()")
+ session_id = results.data[0]
+ close_session_url = "{0}close_session?session_id={1}".format(self.ROOT_URL.format
+ ("25000"), session_id)
+ requests.get(close_session_url)
+ self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s debug "
+ "web interface by client at", expected_count=-1)