You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/02/13 22:21:23 UTC

[impala] 05/05: IMPALA-9182: Print the socket address of the client closing a session or cancelling a query from the WebUI

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d96dab20dd663dfdcdcd2771d2f210cb02ecd710
Author: Vincent Tran <vt...@cloudera.com>
AuthorDate: Mon Feb 10 08:57:46 2020 -0800

    IMPALA-9182: Print the socket address of the client closing a session or cancelling a query from the WebUI
    
    This change appends the socket address (HOST:PORT) of the client
    who made the request to close a session or cancel a query from
    the coordinator's debug WebUI.
    
    Existing statuses:
    "Cancelled from Impala's debug web interface"
    "Session closed from Impala's debug web interface"
    
    New statuses:
    "Cancelled from Impala's debug web interface by client at
     <host>:<port>"
    "Session closed from Impala's debug web interface by client
     at <host>:<port>"
    
    Testing:
    -Verified visually that the status message is printed in the impalad
     log with the socket address when one cancels a query or closes a session.
    -Added a new e2e test to verify that the new status gets printed in
     runtime profile and coordinator log when a query is cancelled in this
     way.
    -Made log asserts more robust by adding a timeout/wait value.
    
    Change-Id: Icf74ad06ce1c40fab4ce37de6b7ca78e3e520b43
    Reviewed-on: http://gerrit.cloudera.org:8080/14782
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/util/web_callback_registry.h  |  3 ++
 be/src/service/impala-http-handler.cc     |  6 ++-
 be/src/util/webserver.cc                  |  1 +
 tests/common/impala_test_suite.py         | 67 +++++++++++++++++++------------
 tests/observability/test_log_fragments.py |  2 -
 tests/webserver/test_web_pages.py         | 28 ++++++++++++-
 6 files changed, 77 insertions(+), 30 deletions(-)

diff --git a/be/src/kudu/util/web_callback_registry.h b/be/src/kudu/util/web_callback_registry.h
index 3b7ff13..d2e4ecc 100644
--- a/be/src/kudu/util/web_callback_registry.h
+++ b/be/src/kudu/util/web_callback_registry.h
@@ -65,6 +65,9 @@ class WebCallbackRegistry {
 
     // In the case of a POST, the posted data.
     std::string post_data;
+
+    // The socket address of the requester, <host>:<port>.
+    std::string source_socket;
   };
 
   // A response to an HTTP request whose body is rendered by template.
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 865a73b..48d94ce 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -212,7 +212,8 @@ void ImpalaHttpHandler::CancelQueryHandler(const Webserver::WebRequest& req,
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
-  Status cause("Cancelled from Impala's debug web interface");
+  Status cause(Substitute("Cancelled from Impala's debug web interface by client at $0"
+                           , req.source_socket));
   // Web UI doesn't have access to secret so we can't validate it. We assume that
   // web UI is allowed to close queries.
   status = server_->UnregisterQuery(unique_id, true, &cause);
@@ -234,7 +235,8 @@ void ImpalaHttpHandler::CloseSessionHandler(const Webserver::WebRequest& req,
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
-  Status cause("Session closed from Impala's debug web interface");
+  Status cause(Substitute("Session closed from Impala's debug web interface by client at"
+                          " $0", req.source_socket));
   // Web UI doesn't have access to secret so we can't validate it. We assume that
   // web UI is allowed to close sessions.
   status = server_->CloseSessionInternal(unique_id,
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 65f7555..c3dc7b7 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -580,6 +580,7 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
   }
 
   WebRequest req;
+  req.source_socket = GetRemoteAddress(request_info).ToString();
   if (request_info->query_string != nullptr) {
     req.query_string = request_info->query_string;
     BuildArgumentMap(request_info->query_string, &req.parsed_args);
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 5127dcc..0ac21b4 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1147,44 +1147,61 @@ class ImpalaTestSuite(BaseTestSuite):
         "Check failed to return True after {0} tries and {1} seconds{2}".format(
           count, timeout_s, error_msg_str))
 
-  def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
+  def assert_impalad_log_contains(self, level, line_regex, expected_count=1, timeout_s=6):
     """
     Convenience wrapper around assert_log_contains for impalad logs.
     """
-    self.assert_log_contains("impalad", level, line_regex, expected_count)
+    self.assert_log_contains("impalad", level, line_regex, expected_count, timeout_s)
 
-  def assert_catalogd_log_contains(self, level, line_regex, expected_count=1):
+  def assert_catalogd_log_contains(self, level, line_regex, expected_count=1,
+      timeout_s=6):
     """
     Convenience wrapper around assert_log_contains for catalogd logs.
     """
-    self.assert_log_contains("catalogd", level, line_regex, expected_count)
+    self.assert_log_contains("catalogd", level, line_regex, expected_count, timeout_s)
 
-  def assert_log_contains(self, daemon, level, line_regex, expected_count=1):
+  def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6):
     """
     Assert that the daemon log with specified level (e.g. ERROR, WARNING, INFO) contains
     expected_count lines with a substring matching the regex. When expected_count is -1,
     at least one match is expected.
+    Retries until 'timeout_s' has expired. The default timeout is the default minicluster
+    log buffering time (5 seconds) with a one second buffer.
     When using this method to check log files of running processes, the caller should
     make sure that log buffering has been disabled, for example by adding
-    '-logbuflevel=-1' to the daemon startup options.
+    '-logbuflevel=-1' to the daemon startup options or set timeout_s to a value higher
+    than the log flush interval.
     """
     pattern = re.compile(line_regex)
-    found = 0
-    if hasattr(self, "impala_log_dir"):
-      log_dir = self.impala_log_dir
-    else:
-      log_dir = EE_TEST_LOGS_DIR
-    log_file_path = os.path.join(log_dir, daemon + "." + level)
-    # Resolve symlinks to make finding the file easier.
-    log_file_path = os.path.realpath(log_file_path)
-    with open(log_file_path) as log_file:
-      for line in log_file:
-        if pattern.search(line):
-          found += 1
-    if expected_count == -1:
-      assert found > 0, "Expected at least one line in file %s matching regex '%s'"\
-        ", but found none." % (log_file_path, line_regex)
-    else:
-      assert found == expected_count, "Expected %d lines in file %s matching regex '%s'"\
-        ", but found %d lines. Last line was: \n%s" %\
-        (expected_count, log_file_path, line_regex, found, line)
+    start_time = time.time()
+    while True:
+      try:
+        found = 0
+        if hasattr(self, "impala_log_dir"):
+          log_dir = self.impala_log_dir
+        else:
+          log_dir = EE_TEST_LOGS_DIR
+        log_file_path = os.path.join(log_dir, daemon + "." + level)
+        # Resolve symlinks to make finding the file easier.
+        log_file_path = os.path.realpath(log_file_path)
+        with open(log_file_path) as log_file:
+          for line in log_file:
+            if pattern.search(line):
+              found += 1
+        if expected_count == -1:
+          assert found > 0, "Expected at least one line in file %s matching regex '%s'"\
+            ", but found none." % (log_file_path, line_regex)
+        else:
+          assert found == expected_count, \
+            "Expected %d lines in file %s matching regex '%s', but found %d lines. "\
+            "Last line was: \n%s" %\
+            (expected_count, log_file_path, line_regex, found, line)
+        return
+      except AssertionError as e:
+        # Re-throw the exception to the caller only when the timeout is expired. Otherwise
+        # sleep before retrying.
+        if time.time() - start_time > timeout_s:
+          raise
+        LOG.info("Expected log lines could not be found, sleeping before retrying: %s",
+            str(e))
+        time.sleep(1)
diff --git a/tests/observability/test_log_fragments.py b/tests/observability/test_log_fragments.py
index 81fbf53..f81d79e 100644
--- a/tests/observability/test_log_fragments.py
+++ b/tests/observability/test_log_fragments.py
@@ -40,8 +40,6 @@ class TestLogFragments(ImpalaTestSuite):
     query_id = re.search("id=([0-9a-f]+:[0-9a-f]+)",
         result.runtime_profile).groups()[0]
     self.execute_query("select 1")
-    # Logging may be buffered, so sleep to wait out the buffering.
-    time.sleep(6)
     self.assert_impalad_log_contains('INFO', query_id +
       "] Analysis and authorization finished.")
     assert query_id.endswith("000")
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 523ff12..e6cbfa6 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -17,7 +17,7 @@
 
 from tests.common.environ import ImpalaTestClusterFlagsDetector
 from tests.common.file_utils import grep_dir
-from tests.common.skip import SkipIfBuildType
+from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 import itertools
@@ -762,3 +762,29 @@ class TestWebPage(ImpalaTestSuite):
     self.get_and_check_status(self.ROOT_URL,
         "href='http://.*:%s/'" % self.IMPALAD_TEST_PORT[0], self.IMPALAD_TEST_PORT,
         regex=True, headers={'X-Forwarded-Context': '/gateway'})
+
+  @SkipIfDockerizedCluster.daemon_logs_not_exposed
+  def test_display_src_socket_in_query_cause(self):
+    # Execute a long running query then cancel it from the WebUI.
+    # Check the runtime profile and the INFO logs for the cause message.
+    query = "select sleep(10000)"
+    query_id = self.execute_query_async(query).get_handle().id
+    cancel_query_url = "{0}cancel_query?query_id={1}".format(self.ROOT_URL.format
+      ("25000"), query_id)
+    text_profile_url = "{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL
+      .format("25000"), query_id)
+    requests.get(cancel_query_url)
+    response = requests.get(text_profile_url)
+    cancel_status = "Cancelled from Impala&apos;s debug web interface by client at"
+    assert cancel_status in response.text
+    self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug web "
+      "interface by client at", expected_count=-1)
+    # Session closing from the WebUI does not produce the cause message in the profile,
+    # so we will skip checking the runtime profile.
+    results = self.execute_query("select current_session()")
+    session_id = results.data[0]
+    close_session_url = "{0}close_session?session_id={1}".format(self.ROOT_URL.format
+      ("25000"), session_id)
+    requests.get(close_session_url)
+    self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s debug "
+      "web interface by client at", expected_count=-1)