You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/07/29 16:06:23 UTC

[impala] branch master updated (46f1343 -> 780a892)

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 46f1343  IMPALA-10821 Fix TestTPCHJoinQueries.test_outer_joins failed
     new d59a79b  IMPALA-10823: Output fewer information when external frontend is used
     new 06c9016  IMPALA-8762: Track host level admission stats across all coordinators
     new 8a2758f  IMPALA-10779: Print the username closing a session or cancelling a query from the WebUI
     new 780a892  IMPALA-10813: Invalidate external table from catalog cache for truncate table HMS api

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/kudu/util/web_callback_registry.h           |   3 +
 be/src/scheduling/admission-controller.cc          | 177 +++++++++++++++------
 be/src/scheduling/admission-controller.h           |  33 ++--
 be/src/service/impala-hs2-server.cc                |   3 +-
 be/src/service/impala-http-handler.cc              |   8 +-
 be/src/util/thrift-debug-util.h                    |  14 +-
 be/src/util/webserver.cc                           |   3 +
 common/thrift/StatestoreService.thrift             |  28 ++++
 common/thrift/metrics.json                         |   4 +-
 .../catalog/metastore/MetastoreServiceHandler.java |  10 +-
 .../apache/impala/customcluster/LdapHS2Test.java   |  13 ++
 .../impala/customcluster/LdapWebserverTest.java    |  69 ++++++++
 tests/common/custom_cluster_test_suite.py          |   8 +-
 tests/custom_cluster/test_catalog_wait.py          |   2 +-
 tests/custom_cluster/test_executor_groups.py       |  79 ++++++++-
 tests/custom_cluster/test_metastore_service.py     |  50 +++++-
 tests/custom_cluster/test_runtime_profile.py       |   2 +-
 tests/custom_cluster/test_scratch_disk.py          |  14 +-
 tests/webserver/test_web_pages.py                  |   7 +-
 19 files changed, 423 insertions(+), 104 deletions(-)

[impala] 04/04: IMPALA-10813: Invalidate external table from catalog cache for truncate table HMS api

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 780a892f5700719e59cc49bafcc46460361673de
Author: Sourabh Goyal <so...@cloudera.com>
AuthorDate: Thu Jul 15 09:45:14 2021 -0700

    IMPALA-10813: Invalidate external table from catalog cache for
    truncate table HMS api
    
    This patch is in continuation of IMPALA-10648 in which we missed
    invalidating external table for truncate_table api
    
    Testing:
    Enhanced exiting test to include truncate_table scenario
    
    Change-Id: I734c2b5f371291fef32badab9efc886b4b067e10
    Reviewed-on: http://gerrit.cloudera.org:8080/17705
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../catalog/metastore/MetastoreServiceHandler.java | 10 +++--
 tests/custom_cluster/test_metastore_service.py     | 50 +++++++++++++++++++---
 2 files changed, 51 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index 3ecd516..a57c17f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -673,15 +673,19 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor
       throws MetaException, TException {
     try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
       client.getHiveClient().getThriftClient().truncate_table(dbName, tblName, partNames);
+      invalidateNonTransactionalTableIfExists(dbName, tblName, "truncate_table");
     }
   }
 
   @Override
   public TruncateTableResponse truncate_table_req(
-      TruncateTableRequest truncateTableRequest) throws MetaException, TException {
+      TruncateTableRequest req) throws MetaException, TException {
     try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
-      return client.getHiveClient().getThriftClient()
-          .truncate_table_req(truncateTableRequest);
+      TruncateTableResponse resp = client.getHiveClient().getThriftClient()
+          .truncate_table_req(req);
+      invalidateNonTransactionalTableIfExists(req.getDbName(),
+          req.getTableName(), "truncate_table_req");
+      return resp;
     }
   }
 
diff --git a/tests/custom_cluster/test_metastore_service.py b/tests/custom_cluster/test_metastore_service.py
index c7cf7ee..0251032 100644
--- a/tests/custom_cluster/test_metastore_service.py
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -20,6 +20,7 @@ from hive_metastore.ttypes import Database
 from hive_metastore.ttypes import FieldSchema
 from hive_metastore.ttypes import GetTableRequest
 from hive_metastore.ttypes import GetPartitionsByNamesRequest
+from hive_metastore.ttypes import TruncateTableRequest
 from hive_metastore.ttypes import Table
 from hive_metastore.ttypes import StorageDescriptor
 from hive_metastore.ttypes import SerDeInfo
@@ -439,6 +440,13 @@ class TestMetastoreService(CustomClusterTestSuite):
               and then subsequent get_partitions_by_names_req should load table from HMS
               If invalidateCache is False, the get_partitions_by_names_req
               should be served from the already cached table
+        2. Truncate table remove partition:
+              If invalidateCache is True, then table should be removed from cache
+              and then subsequent get_partitions_by_names_req should load table from HMS
+              and the file metadata for the truncated partition should be none
+              If invalidateCache is False, the get_partitions_by_names_req
+              should be served from the already cached table which has stale
+              partition file metadata
         2. Alter table rename :
               If invalidateCache is True, this should remove the old table from the cache
               and add new table. Subsequent get_table req on old table should throw an
@@ -476,6 +484,8 @@ class TestMetastoreService(CustomClusterTestSuite):
                 "insert into {0}.{1} PARTITION (part_col=1) VALUES (1)".format(
                     db_name, tbl_name),
                 "insert into {0}.{1} PARTITION (part_col=2) VALUES (2)".format(
+                    db_name, tbl_name),
+                "insert into {0}.{1} PARTITION (part_col=3) VALUES (3)".format(
                     db_name, tbl_name)
             ]
             for query in insert_queries:
@@ -492,11 +502,11 @@ class TestMetastoreService(CustomClusterTestSuite):
             partitions_response = catalog_hms_client.get_partition_names(
                 db_name, tbl_name, -1)
             assert partitions_response is not None
-            assert len(partitions_response) == 2
+            assert len(partitions_response) == 3
 
             # drop a partition
             catalog_hms_client.drop_partition_by_name(
-                db_name, tbl_name, "part_col=2", True)
+                db_name, tbl_name, "part_col=3", True)
 
             # save cur_get_table_response in prev_get_table_response
             # before calling get_table_req HMS api
@@ -508,21 +518,49 @@ class TestMetastoreService(CustomClusterTestSuite):
                 new_get_table_request)
             assert cur_get_table_response.table is not None
 
-            part_col_names = ["part_col=1", "part_col=2"]
+            part_col_names = ["part_col=1", "part_col=2", "part_col=3"]
             get_parts_req = GetPartitionsByNamesRequest()
             get_parts_req.db_name = db_name
             get_parts_req.tbl_name = tbl_name
             get_parts_req.names = part_col_names
+            get_parts_req.getFileMetadata = True
             parts_response = catalog_hms_client.get_partitions_by_names_req(
                 get_parts_req)
             if invalidateCache:
                 # drop_partition_by_name hms api should invalidate
                 # table from cache and reload new table from HMS
-                len(parts_response.partitions) == 1
+                len(parts_response.partitions) == 2
             else:
                 # table should be served from the cache
-                # and the cached table has 2 partitions
-                len(parts_response.partitions) == 2
+                # and the cached table has 3 partitions
+                len(parts_response.partitions) == 3
+
+            # Truncate table by removing a partition
+            part_to_truncate = ["part_col=2"]
+            truncate_table_req = TruncateTableRequest()
+            truncate_table_req.dbName = db_name
+            truncate_table_req.tableName = tbl_name
+            truncate_table_req.partNames = part_to_truncate
+            catalog_hms_client.truncate_table_req(truncate_table_req)
+
+            # Check partition's file metadata after truncating it
+            # for table invalidated from cache, partition
+            # file metadata is none whereas it is not none
+            # for stale table present in cache
+            get_parts_req = GetPartitionsByNamesRequest()
+            get_parts_req.db_name = db_name
+            get_parts_req.tbl_name = tbl_name
+            get_parts_req.names = part_to_truncate
+            get_parts_req.getFileMetadata = True
+            parts_response = catalog_hms_client.get_partitions_by_names_req(
+                get_parts_req)
+            assert len(parts_response.partitions) == 1
+            for part in parts_response.partitions:
+                assert part.fileMetadata is not None
+                if invalidateCache:
+                    assert part.fileMetadata.data is None
+                else:
+                    assert part.fileMetadata.data is not None
 
             # alter current table by renaming
             # the table name

[impala] 01/04: IMPALA-10823: Output fewer information when external frontend is used

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d59a79b1a23bca298dcf036e8fd45f5113241100
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Fri Jul 23 11:41:14 2021 +0200

    IMPALA-10823: Output fewer information when external frontend is used
    
    We use ThriftDebugString() to output a TExecutePlannedStatementReq.
    TExecutePlannedStatementReq can be quite large since it contains a
    Frontend.TExecRequest as well.
    
    We only need to output a redacted version of
    TExecutePlannedStatementReq.statementReq. At higher log level we
    output TExecutePlannedStatementReq.plan as well.
    
    Change-Id: Ib862bfa298855943037afef53990160dcc8321ad
    Reviewed-on: http://gerrit.cloudera.org:8080/17718
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-hs2-server.cc |  3 ++-
 be/src/util/thrift-debug-util.h     | 14 +++++++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 84aee0b..987dd70 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -556,7 +556,8 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
 void ImpalaServer::ExecutePlannedStatement(
       TExecuteStatementResp& return_val,
       const TExecutePlannedStatementReq& request) {
-  VLOG_QUERY << "ExecutePlannedStatement(): request=" << ThriftDebugString(request);
+  VLOG_QUERY << "ExecutePlannedStatement(): request=" << RedactedDebugString(request);
+  VLOG(3) << "ExecutePlannedStatement(): plan=" << ThriftDebugString(request.plan);
   const ThriftServer::ConnectionContext* connection_context =
       ThriftServer::GetThreadConnectionContext();
   // This RPC is only supported on the external frontend service and should only be
diff --git a/be/src/util/thrift-debug-util.h b/be/src/util/thrift-debug-util.h
index da07a1e..863489e 100644
--- a/be/src/util/thrift-debug-util.h
+++ b/be/src/util/thrift-debug-util.h
@@ -18,6 +18,8 @@
 #pragma once
 #include <string>
 #include <thrift/protocol/TDebugProtocol.h>
+
+#include "gutil/strings/substitute.h"
 #include "util/debug-util.h"
 
 DECLARE_string(debug_actions);
@@ -61,7 +63,8 @@ struct HasSecret {
   static constexpr bool value =
       HasSessionHandle<T>::value ||
       HasOperationHandle<T>::value ||
-      std::is_same<T, apache::hive::service::cli::thrift::THandleIdentifier>::value;
+      std::is_same<T, apache::hive::service::cli::thrift::THandleIdentifier>::value ||
+      std::is_same<T, impala::TExecutePlannedStatementReq>::value;
 };
 
 template <typename T,
@@ -132,6 +135,15 @@ inline std::string RedactedDebugString(const T& t) {
   return apache::thrift::ThriftDebugString(copy);
 }
 
+inline std::string RedactedDebugString(const TExecutePlannedStatementReq& req) {
+  static const char* ret_pattern =
+      "TExecutePlannedStatementReq {\n"
+      "  01: statementReq (struct) = $0,\n"
+      "  *** OTHER FIELDS ARE OMITTED ***\n"
+      "}";
+  return strings::Substitute(ret_pattern, RedactedDebugString(req.statementReq));
+}
+
 /// Raise compile-time error when ThriftDebugStringNoThrow() is used on an object that has
 /// a session secret.
 template <typename T,

[impala] 02/04: IMPALA-8762: Track host level admission stats across all coordinators

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 06c9016a37edc1e5514378dbe17a679e294a0942
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Thu Jul 1 17:02:39 2021 -0700

    IMPALA-8762: Track host level admission stats across all coordinators
    
    This patch adds the ability to share the per-host stats for locally
    admitted queries across all coordinators. This helps to get a more
    consolidated view of the cluster for stats like slots_in_use and
    mem_admitted when making local admission decisions.
    
    Testing:
    Added e2e py test
    
    Change-Id: I2946832e0a89b077d0f3bec755e4672be2088243
    Reviewed-on: http://gerrit.cloudera.org:8080/17683
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc    | 177 ++++++++++++++++++++-------
 be/src/scheduling/admission-controller.h     |  33 ++---
 common/thrift/StatestoreService.thrift       |  28 +++++
 common/thrift/metrics.json                   |   4 +-
 tests/common/custom_cluster_test_suite.py    |   8 +-
 tests/custom_cluster/test_catalog_wait.py    |   2 +-
 tests/custom_cluster/test_executor_groups.py |  79 +++++++++++-
 tests/custom_cluster/test_runtime_profile.py |   2 +-
 tests/custom_cluster/test_scratch_disk.py    |  14 +--
 9 files changed, 261 insertions(+), 86 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 1d1f544..da79c07 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -71,12 +71,17 @@ string PrintBytes(int64_t value) {
   return PrettyPrinter::Print(value, TUnit::BYTES);
 }
 
-// Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
-// "!" is used because the backend id contains a colon, but it should not contain "!".
-// When parsing the topic key we need to be careful to find the last instance in
-// case the pool name contains it as well.
+// Delimiter used for pool topic keys of the form
+// "<prefix><pool_name><delimiter><backend_id>". "!" is used because the backend id
+// contains a colon, but it should not contain "!". When parsing the topic key we need to
+// be careful to find the last instance in case the pool name contains it as well.
 const char TOPIC_KEY_DELIMITER = '!';
 
+// Prefix used by topic keys for pool stat updates.
+const string TOPIC_KEY_POOL_PREFIX = "POOL:";
+// Prefix used by topic keys for PerHostStat updates.
+const string TOPIC_KEY_STAT_PREFIX = "STAT:";
+
 // Delimiter used for the resource pool prefix of executor groups. In order to be used for
 // queries in "resource-pool-A", an executor group name must start with
 // "resource-pool-A-".
@@ -236,21 +241,30 @@ const string HOST_SLOT_NOT_AVAILABLE = "Not enough admission control slots avail
                                        "host $0. Needed $1 slots but $2/$3 are already "
                                        "in use.";
 
-// Parses the pool name and backend_id from the topic key if it is valid.
-// Returns true if the topic key is valid and pool_name and backend_id are set.
-static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
-    string* backend_id) {
-  // Topic keys will look something like: poolname!hostname:22000
-  // The '!' delimiter should always be present, the pool name must be
-  // at least 1 character, and network address must be at least 3 characters (including
-  // ':' and if the hostname and port are each only 1 character). Then the topic_key must
-  // be at least 5 characters (1 + 1 + 3).
-  const int MIN_TOPIC_KEY_SIZE = 5;
+// Parses the topic key to separate the prefix that helps recognize the kind of update
+// received.
+static inline bool ParseTopicKey(
+    const string& topic_key, string* prefix, string* suffix) {
+  // The prefix should always be present and the network address must be at least 3
+  // characters (including ':' and if the hostname and port are each only 1 character).
+  // Then the topic_key must be at least 8 characters (5 + 3).
+  const int MIN_TOPIC_KEY_SIZE = 8;
   if (topic_key.length() < MIN_TOPIC_KEY_SIZE) {
     VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
     return false;
   }
+  DCHECK_EQ(TOPIC_KEY_POOL_PREFIX.size(), TOPIC_KEY_STAT_PREFIX.size())
+      << "All admission topic key prefixes should be of the same size";
+  *prefix = topic_key.substr(0, TOPIC_KEY_POOL_PREFIX.size());
+  *suffix = topic_key.substr(TOPIC_KEY_POOL_PREFIX.size());
+  return true;
+}
 
+// Parses the pool name and backend_id from the topic key if it is valid.
+// Returns true if the topic key is valid and pool_name and backend_id are set.
+static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
+    string* backend_id) {
+  // Pool topic keys will look something like: poolname!hostname:22000
   size_t pos = topic_key.find_last_of(TOPIC_KEY_DELIMITER);
   if (pos == string::npos || pos >= topic_key.size() - 1) {
     VLOG_QUERY << "Invalid topic key for pool: " << topic_key;
@@ -851,16 +865,25 @@ bool AdmissionController::HasAvailableMemResources(const ScheduleState& state,
     const NetworkAddressPB& host = entry.first;
     const string host_id = NetworkAddressPBToString(host);
     int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit();
-    const HostStats& host_stats = host_stats_[host_id];
+    const THostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
-    int64_t mem_admitted = host_stats.mem_admitted;
+    int64_t agg_mem_admitted_on_host = host_stats.mem_admitted;
+    // Aggregate the mem admitted across all queries admitted by other coordinators.
+    for (const auto& remote_entry : remote_per_host_stats_) {
+      auto remote_stat_itr = remote_entry.second.find(host_id);
+      if (remote_stat_itr != remote_entry.second.end()) {
+        agg_mem_admitted_on_host += remote_stat_itr->second.mem_admitted;
+      }
+    }
     int64_t mem_to_admit = GetMemToAdmit(state, entry.second);
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
-             << " mem_admitted=" << PrintBytes(mem_admitted)
+             << " mem_admitted=" << PrintBytes(host_stats.mem_admitted)
+             << " agg_mem_admitted_on_host=" << PrintBytes(agg_mem_admitted_on_host)
              << " needs=" << PrintBytes(mem_to_admit)
              << " admit_mem_limit=" << PrintBytes(admit_mem_limit);
-    int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
+    int64_t effective_host_mem_reserved =
+        std::max(mem_reserved, agg_mem_admitted_on_host);
     if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) {
       *mem_unavailable_reason =
           Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
@@ -894,14 +917,23 @@ bool AdmissionController::HasAvailableSlots(const ScheduleState& state,
     const NetworkAddressPB& host = entry.first;
     const string host_id = NetworkAddressPBToString(host);
     int64_t admission_slots = entry.second.be_desc.admission_slots();
-    int64_t slots_in_use = host_stats_[host_id].slots_in_use;
+    int64_t agg_slots_in_use_on_host = host_stats_[host_id].slots_in_use;
+    // Aggregate num of slots in use across all queries admitted by other coordinators.
+    for (const auto& remote_entry : remote_per_host_stats_) {
+      auto remote_stat_itr = remote_entry.second.find(host_id);
+      if (remote_stat_itr != remote_entry.second.end()) {
+        agg_slots_in_use_on_host += remote_stat_itr->second.slots_in_use;
+      }
+    }
     VLOG_ROW << "Checking available slot on host=" << host_id
-             << " slots_in_use=" << slots_in_use
-             << " needs=" << slots_in_use + entry.second.exec_params->slots_to_use()
+             << " slots_in_use=" << agg_slots_in_use_on_host << " needs="
+             << agg_slots_in_use_on_host + entry.second.exec_params->slots_to_use()
              << " executor admission_slots=" << admission_slots;
-    if (slots_in_use + entry.second.exec_params->slots_to_use() > admission_slots) {
+    if (agg_slots_in_use_on_host + entry.second.exec_params->slots_to_use()
+        > admission_slots) {
       *unavailable_reason = Substitute(HOST_SLOT_NOT_AVAILABLE, host_id,
-          entry.second.exec_params->slots_to_use(), slots_in_use, admission_slots);
+          entry.second.exec_params->slots_to_use(), agg_slots_in_use_on_host,
+          admission_slots);
       if (entry.second.be_desc.is_coordinator()) {
         coordinator_resource_limited = true;
       }
@@ -1544,7 +1576,7 @@ void AdmissionController::UpdatePoolStats(
     vector<TTopicDelta>* subscriber_topic_updates) {
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    AddPoolUpdates(subscriber_topic_updates);
+    AddPoolAndPerHostStatsUpdates(subscriber_topic_updates);
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
         incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
@@ -1590,26 +1622,54 @@ void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
 }
 
 void AdmissionController::HandleTopicUpdates(const vector<TTopicItem>& topic_updates) {
-  for (const TTopicItem& item: topic_updates) {
-    string pool_name;
-    string topic_backend_id;
-    if (!ParsePoolTopicKey(item.key, &pool_name, &topic_backend_id)) continue;
-    // The topic entry from this subscriber is handled specially; the stats coming
-    // from the statestore are likely already outdated.
-    if (topic_backend_id == host_id_) continue;
-    if (item.deleted) {
-      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
-      continue;
-    }
-    TPoolStats remote_update;
-    uint32_t len = item.value.size();
-    Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
-          item.value.data()), &len, false, &remote_update);
-    if (!status.ok()) {
-      VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
-      continue;
+  string topic_key_prefix;
+  string topic_key_suffix;
+  string pool_name;
+  string topic_backend_id;
+  for (const TTopicItem& item : topic_updates) {
+    if (!ParseTopicKey(item.key, &topic_key_prefix, &topic_key_suffix)) continue;
+    if (topic_key_prefix == TOPIC_KEY_POOL_PREFIX) {
+      if (!ParsePoolTopicKey(topic_key_suffix, &pool_name, &topic_backend_id)) continue;
+      // The topic entry from this subscriber is handled specially; the stats coming
+      // from the statestore are likely already outdated.
+      if (topic_backend_id == host_id_) continue;
+      if (item.deleted) {
+        GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
+        continue;
+      }
+      TPoolStats remote_update;
+      uint32_t len = item.value.size();
+      Status status =
+          DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), &len,
+              false, &remote_update);
+      if (!status.ok()) {
+        VLOG_QUERY << "Error deserializing pool update with key: " << item.key;
+        continue;
+      }
+      GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
+    } else if (topic_key_prefix == TOPIC_KEY_STAT_PREFIX) {
+      topic_backend_id = topic_key_suffix;
+      if (topic_backend_id == host_id_) continue;
+      if (item.deleted) {
+        remote_per_host_stats_.erase(topic_backend_id);
+        continue;
+      }
+      TPerHostStatsUpdate remote_update;
+      uint32_t len = item.value.size();
+      Status status =
+          DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(item.value.data()), &len,
+              false, &remote_update);
+      if (!status.ok()) {
+        VLOG_QUERY << "Error deserializing stats update with key: " << item.key;
+        continue;
+      }
+      PerHostStats& stats = remote_per_host_stats_[topic_backend_id];
+      for(const auto& elem: remote_update.per_host_stats) {
+        stats[elem.host_addr] = elem.stats;
+      }
+    } else {
+      VLOG_QUERY << "Invalid topic key prefix: " << topic_key_prefix;
     }
-    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, &remote_update);
   }
 }
 
@@ -1873,13 +1933,17 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   metrics_.local_backend_mem_usage->SetValue(current_usage);
 }
 
-void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
+void AdmissionController::AddPoolAndPerHostStatsUpdates(
+    vector<TTopicDelta>* topic_updates) {
   // local_stats_ are updated eagerly except for backend_mem_reserved (which isn't used
   // for local admission control decisions). Update that now before sending local_stats_.
   for (auto& entry : pool_stats_) {
     entry.second.UpdateMemTrackerStats();
   }
-  if (pools_for_updates_.empty()) return;
+  if (pools_for_updates_.empty()) {
+    // No pool updates means no changes to host stats as well, so just return.
+    return;
+  }
   topic_updates->push_back(TTopicDelta());
   TTopicDelta& topic_delta = topic_updates->back();
   topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
@@ -1894,10 +1958,28 @@ void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
         &topic_item.value);
     if (!status.ok()) {
       LOG(WARNING) << "Failed to serialize query pool stats: " << status.GetDetail();
-      topic_updates->pop_back();
+      topic_delta.topic_entries.pop_back();
     }
   }
   pools_for_updates_.clear();
+
+  // Now add the host stats
+  topic_delta.topic_entries.push_back(TTopicItem());
+  TTopicItem& topic_item = topic_delta.topic_entries.back();
+  topic_item.key = Substitute("$0$1", TOPIC_KEY_STAT_PREFIX, host_id_);
+  TPerHostStatsUpdate update;
+  for (const auto& elem : host_stats_) {
+    update.per_host_stats.emplace_back();
+    TPerHostStatsUpdateElement& inserted_elem = update.per_host_stats.back();
+    inserted_elem.__set_host_addr(elem.first);
+    inserted_elem.__set_stats(elem.second);
+  }
+  Status status =
+      thrift_serializer_.SerializeToString(&update, &topic_item.value);
+  if (!status.ok()) {
+    LOG(WARNING) << "Failed to serialize host stats: " << status.GetDetail();
+    topic_delta.topic_entries.pop_back();
+  }
 }
 
 void AdmissionController::DequeueLoop() {
@@ -2362,7 +2444,8 @@ string AdmissionController::MakePoolTopicKey(
   // Ensure the backend_id does not contain the delimiter to ensure that the topic key
   // can be parsed properly by finding the last instance of the delimiter.
   DCHECK_EQ(backend_id.find(TOPIC_KEY_DELIMITER), string::npos);
-  return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
+  return Substitute(
+      "$0$1$2$3", TOPIC_KEY_POOL_PREFIX, pool_name, TOPIC_KEY_DELIMITER, backend_id);
 }
 
 vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery(
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index fb8bca2..e52a769 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -437,23 +437,10 @@ class AdmissionController {
   /// Calls ResetInformationalStats on all pools.
   void ResetAllPoolInformationalStats();
 
-  // This struct stores per-host statistics which are used during admission and by HTTP
-  // handlers to query admission control statistics for currently registered backends.
-  struct HostStats {
-    /// The mem reserved for a query that is currently executing is its memory limit, if
-    /// set (which should be the common case with admission control). Otherwise, if the
-    /// query has no limit or the query is finished executing, the current consumption
-    /// (tracked by its query mem tracker) is used.
-    int64_t mem_reserved = 0;
-    /// The per host mem admitted only for the queries admitted locally.
-    int64_t mem_admitted = 0;
-    /// The per host number of queries admitted only for the queries admitted locally.
-    int64_t num_admitted = 0;
-    /// The per host number of slots in use for the queries admitted locally.
-    int64_t slots_in_use = 0;
-  };
-
-  typedef std::unordered_map<std::string, HostStats> PerHostStats;
+  // This maps a backends's id(host/port id) to its host level statistics which are used
+  // during admission and by HTTP handlers to query admission control statistics for
+  // currently registered backends.
+  typedef std::unordered_map<std::string, THostStats> PerHostStats;
 
   // Populates the input map with the per host memory reserved and admitted in the
   // following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
@@ -510,6 +497,11 @@ class AdmissionController {
 
   PerHostStats host_stats_;
 
+  /// A map from other coordinator's host_id (host/port id) -> their view of the
+  /// PerHostStats. Used to get a full view of the cluster state while making admission
+  /// decisions. Updated via statestore updates.
+  std::unordered_map<std::string, PerHostStats> remote_per_host_stats_;
+
   /// Counter of the number of times dequeuing a query failed because of a resource
   /// issue on the coordinator (which therefore cannot be resolved by adding more
   /// executor groups).
@@ -908,9 +900,10 @@ class AdmissionController {
       std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Adds outgoing topic updates to subscriber_topic_updates for pools that have changed
-  /// since the last call to AddPoolUpdates(). Called by UpdatePoolStats() before
-  /// UpdateClusterAggregates(). Must hold admission_ctrl_lock_.
-  void AddPoolUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
+  /// since the last call to AddPoolUpdates(). Also adds the complete local view of
+  /// per-host statistics. Called by UpdatePoolStats() before UpdateClusterAggregates().
+  /// Must hold admission_ctrl_lock_.
+  void AddPoolAndPerHostStatsUpdates(std::vector<TTopicDelta>* subscriber_topic_updates);
 
   /// Updates the remote stats with per-host topic_updates coming from the statestore.
   /// Removes remote stats identified by topic deletions coming from the
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 41dedb4..71eb579 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -74,6 +74,34 @@ struct TPoolStats {
   8: required i64 num_running;
 }
 
+struct THostStats {
+  // The mem reserved for a query that is currently executing is its memory limit, if
+  // set (which should be the common case with admission control). Otherwise, if the
+  // query has no limit or the query is finished executing, the current consumption
+  // (tracked by its query mem tracker) is used.
+  1: required i64 mem_reserved;
+
+  // The per host mem admitted only for the queries admitted locally.
+  2: required i64 mem_admitted;
+
+  // The per host number of queries admitted only for the queries admitted locally.
+  3: required i64 num_admitted;
+
+  // The per host number of slots in use for the queries admitted locally.
+  4: required i64 slots_in_use;
+}
+
+struct TPerHostStatsUpdateElement {
+    1: required string host_addr;
+    2: required THostStats stats;
+}
+
+struct TPerHostStatsUpdate {
+  // This stores per-host statistics which are used during admission and by HTTP
+  // handlers to query admission control statistics for currently registered backends.
+  1: required list<TPerHostStatsUpdateElement> per_host_stats;
+}
+
 // Description of a single entry in a topic
 struct TTopicItem {
   // Human-readable topic entry identifier
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index c5f600e..2805acc 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2762,11 +2762,11 @@
     "key": "cluster-membership.backends.total"
   },
   {
-  "description": "Total number of queries running on executor group: $0",
+  "description": "Total number of queries admitted on this coordinator running on executor group: $0",
   "contexts": [
     "IMPALAD"
   ],
-  "label": "Total number of queries running on executor group: $0",
+  "label": "Total number of queries admitted on this coordinator running on executor group: $0",
   "units": "UNIT",
   "kind": "GAUGE",
   "key": "admission-controller.executor-group.num-queries-executing.$0"
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 448a591..6a831bb 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -167,7 +167,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     kwargs = {
       "cluster_size": cluster_size,
       "num_coordinators": num_coordinators,
-      "expected_num_executors": cluster_size,
+      "expected_num_impalads": cluster_size,
       "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
       "use_exclusive_coordinators": use_exclusive_coordinators
     }
@@ -233,7 +233,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
                             use_exclusive_coordinators=False,
                             add_executors=False,
                             log_level=1,
-                            expected_num_executors=DEFAULT_CLUSTER_SIZE,
+                            expected_num_impalads=DEFAULT_CLUSTER_SIZE,
                             expected_subscribers=0,
                             default_query_options=None,
                             statestored_timeout_s=60,
@@ -285,12 +285,12 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     # The number of statestore subscribers is
     # cluster_size (# of impalad) + 1 (for catalogd).
     if expected_subscribers == 0:
-      expected_subscribers = expected_num_executors + 1
+      expected_subscribers = expected_num_impalads + 1
       if "--enable_admission_service" in options:
         expected_subscribers += 1
 
     statestored.service.wait_for_live_subscribers(expected_subscribers,
                                                   timeout=statestored_timeout_s)
     for impalad in cls.cluster.impalads:
-      impalad.service.wait_for_num_known_live_backends(expected_num_executors,
+      impalad.service.wait_for_num_known_live_backends(expected_num_impalads,
                                                        timeout=impalad_timeout_s)
diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py
index 34b366a..11b90d1 100644
--- a/tests/custom_cluster/test_catalog_wait.py
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -49,7 +49,7 @@ class TestCatalogWait(CustomClusterTestSuite):
 
     # On startup, expect only two executors to be registered.
     self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
-                               expected_num_executors=2,
+                               expected_num_impalads=2,
                                expected_subscribers=4)
 
     # Expect that impalad[2] is not ready.
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index df21193..750ff0a 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -43,7 +43,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     method.func_dict["cluster_size"] = 1
     method.func_dict["num_exclusive_coordinators"] = 1
     self.num_groups = 1
-    self.num_executors = 1
+    self.num_impalads = 1
     super(TestExecutorGroups, self).setup_method(method)
     self.coordinator = self.cluster.impalads[0]
 
@@ -53,7 +53,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     return "default-pool-%s" % name
 
   def _add_executor_group(self, name_suffix, min_size, num_executors=0,
-                          admission_control_slots=0):
+                          admission_control_slots=0, extra_args=None):
     """Adds an executor group to the cluster. 'min_size' specifies the minimum size for
     the new group to be considered healthy. 'num_executors' specifies the number of
     executors to start and defaults to 'min_size' but can be different from 'min_size' to
@@ -63,7 +63,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self.num_groups += 1
     if num_executors == 0:
       num_executors = min_size
-    self.num_executors += num_executors
+    self.num_impalads += num_executors
     name = self._group_name(name_suffix)
     LOG.info("Adding %s executors to group %s with minimum size %s" %
              (num_executors, name, min_size))
@@ -71,11 +71,29 @@ class TestExecutorGroups(CustomClusterTestSuite):
                     admission_control_slots]
     if len(name_suffix) > 0:
       cluster_args.append("--impalad_args=-executor_groups=%s:%s" % (name, min_size))
+    if extra_args:
+      cluster_args.append("--impalad_args=%s" % extra_args)
     self._start_impala_cluster(options=cluster_args,
                                cluster_size=num_executors,
                                num_coordinators=0,
                                add_executors=True,
-                               expected_num_executors=self.num_executors)
+                               expected_num_impalads=self.num_impalads)
+
+  def _restart_coordinators(self, num_coordinators, extra_args=None):
+    """Restarts the coordinator spawned in setup_method and enables the caller to start
+    more than one coordinator by specifying 'num_coordinators'"""
+    LOG.info("Adding a coordinator")
+    cluster_args = ["--impalad_args=-executor_groups=coordinator"]
+    if extra_args:
+      cluster_args.append("--impalad_args=%s" % extra_args)
+    self._start_impala_cluster(options=cluster_args,
+                               cluster_size=num_coordinators,
+                               num_coordinators=num_coordinators,
+                               add_executors=False,
+                               expected_num_impalads=num_coordinators,
+                               use_exclusive_coordinators=True)
+    self.coordinator = self.cluster.impalads[0]
+    self.num_impalads = 2
 
   def _get_total_admitted_queries(self):
     """Returns the total number of queries that have been admitted to the default resource
@@ -513,3 +531,56 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2,
                                                    timeout=20)
     assert_hash_join()
+
+  @pytest.mark.execute_serially
+  def test_admission_control_with_multiple_coords(self):
+    """This test verifies that host level metrics like the num of admission slots used
+    and memory admitted is disseminated correctly across the cluster and accounted for
+    while making admission decisions. We run a query that takes up all of a particular
+    resource (slots or memory) and check if attempting to run a query on the other
+    coordinator results in queuing."""
+    # A long running query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypes \
+                 where month < 3 and id + random() < sleep(100);"
+    # default_pool_mem_limit is set to enable mem based admission.
+    self._restart_coordinators(num_coordinators=2,
+                               extra_args="-default_pool_mem_limit=100g")
+    # Create fresh clients
+    second_coord_client = self.create_client_for_nth_impalad(1)
+    self.create_impala_clients()
+    # Add an exec group with a 4gb mem_limit.
+    self._add_executor_group("group1", 2, admission_control_slots=2,
+                             extra_args="-mem_limit=4g")
+    assert self._get_num_executor_groups(only_healthy=True) == 1
+    second_coord_client.set_configuration({'mt_dop': '2'})
+    handle_for_second = second_coord_client.execute_async(QUERY)
+    # Verify that the first coordinator knows about the query running on the second
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 1, timeout=30)
+    handle_for_first = self.execute_query_async(TEST_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.default-pool", 1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_for_first)
+    assert "queue reason: Not enough admission control slots available on host" in \
+           profile, profile
+    self.close_query(handle_for_first)
+    second_coord_client.close_query(handle_for_second)
+    # Wait for first coordinator to get the admission update.
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 0, timeout=30)
+    # Now verify that mem based admission also works as intended. A max of mem_reserved
+    # and mem_admitted is used for this. Since mem_limit is being used here, both will be
+    # identical but this will at least test that code path as a sanity check.
+    second_coord_client.clear_configuration()
+    second_coord_client.set_configuration({'mem_limit': '4g'})
+    handle_for_second = second_coord_client.execute_async(QUERY)
+    # Verify that the first coordinator knows about the query running on the second
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.agg-num-running.default-pool", 1, timeout=30)
+    handle_for_first = self.execute_query_async(TEST_QUERY)
+    self.coordinator.service.wait_for_metric_value(
+      "admission-controller.local-num-queued.default-pool", 1, timeout=30)
+    profile = self.client.get_runtime_profile(handle_for_first)
+    assert "queue reason: Not enough memory available on host" in profile, profile
+    self.close_query(handle_for_first)
+    second_coord_client.close_query(handle_for_second)
diff --git a/tests/custom_cluster/test_runtime_profile.py b/tests/custom_cluster/test_runtime_profile.py
index 933ad7c..7be2166 100644
--- a/tests/custom_cluster/test_runtime_profile.py
+++ b/tests/custom_cluster/test_runtime_profile.py
@@ -44,5 +44,5 @@ class TestRuntimeProfile(CustomClusterTestSuite):
                                cluster_size=1,
                                num_coordinators=0,
                                add_executors=True,
-                               expected_num_executors=4)
+                               expected_num_impalads=4)
     self.run_test_case('runtime-profile-aggregated', vector)
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 4c0f6b3..83d7410 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -213,7 +213,7 @@ class TestScratchDir(CustomClusterTestSuite):
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs))
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -247,7 +247,7 @@ class TestScratchDir(CustomClusterTestSuite):
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'], cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs))
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -281,7 +281,7 @@ class TestScratchDir(CustomClusterTestSuite):
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
       '--impalad_args=--allow_spill_to_hdfs=true'],
       cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -313,7 +313,7 @@ class TestScratchDir(CustomClusterTestSuite):
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
       '--impalad_args=--allow_spill_to_hdfs=true'],
       cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -347,7 +347,7 @@ class TestScratchDir(CustomClusterTestSuite):
       '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
       '--impalad_args=--allow_spill_to_hdfs=true'],
       cluster_size=1,
-      expected_num_executors=1)
+      expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -380,7 +380,7 @@ class TestScratchDir(CustomClusterTestSuite):
       '--impalad_args=--allow_spill_to_hdfs=true',
       '--impalad_args=--remote_tmp_file_size=8M',
       '--impalad_args=--remote_tmp_file_block_size=1m'],
-      cluster_size=1, expected_num_executors=1)
+      cluster_size=1, expected_num_impalads=1)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
@@ -413,7 +413,7 @@ class TestScratchDir(CustomClusterTestSuite):
       '--impalad_args=--allow_spill_to_hdfs=true',
       '--impalad_args=--remote_tmp_file_size=8M',
       '--impalad_args=--remote_tmp_file_block_size=1m'],
-      cluster_size=num, num_coordinators=num, expected_num_executors=num)
+      cluster_size=num, num_coordinators=num, expected_num_impalads=num)
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit

[impala] 03/04: IMPALA-10779: Print the username closing a session or cancelling a query from the WebUI

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8a2758f1e88500867d8d2fbaffb62493fca2215a
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Fri Jul 23 23:11:54 2021 +0800

    IMPALA-10779: Print the username closing a session or cancelling a query from the WebUI
    
    This patch appends the username of the client who made the request to
    close a session or cancel a query from the coordinator's debug WebUI.
    
    Tests:
    - Added a new fe test for LDAP auth to verify that the new status gets
      printed in runtime profile and coordinator log when a query is
      cancelled in this way.
    
    Change-Id: I02c92b5caee61d1f9f381cd2906a850e02c54d55
    Reviewed-on: http://gerrit.cloudera.org:8080/17726
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/util/web_callback_registry.h           |  3 +
 be/src/service/impala-http-handler.cc              |  8 +--
 be/src/util/webserver.cc                           |  3 +
 .../apache/impala/customcluster/LdapHS2Test.java   | 13 ++++
 .../impala/customcluster/LdapWebserverTest.java    | 69 ++++++++++++++++++++++
 tests/webserver/test_web_pages.py                  |  7 ++-
 6 files changed, 96 insertions(+), 7 deletions(-)

diff --git a/be/src/kudu/util/web_callback_registry.h b/be/src/kudu/util/web_callback_registry.h
index d2e4ecc..75fa5a6 100644
--- a/be/src/kudu/util/web_callback_registry.h
+++ b/be/src/kudu/util/web_callback_registry.h
@@ -68,6 +68,9 @@ class WebCallbackRegistry {
 
     // The socket address of the requester, <host>:<port>.
     std::string source_socket;
+
+    // Authenticated user, or 'anonymous' if no auth used
+    std::string source_user = "anonymous";
   };
 
   // A response to an HTTP request whose body is rendered by template.
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 677f50c..caffd7b 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -247,8 +247,8 @@ void ImpalaHttpHandler::CancelQueryHandler(const Webserver::WebRequest& req,
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
-  Status cause(Substitute("Cancelled from Impala's debug web interface by client at $0"
-                           , req.source_socket));
+  Status cause(Substitute("Cancelled from Impala's debug web interface by user:"
+                          " '$0' at $1", req.source_user, req.source_socket));
   // Web UI doesn't have access to secret so we can't validate it. We assume that
   // web UI is allowed to close queries.
   status = server_->UnregisterQuery(unique_id, true, &cause);
@@ -270,8 +270,8 @@ void ImpalaHttpHandler::CloseSessionHandler(const Webserver::WebRequest& req,
     document->AddMember("error", error, document->GetAllocator());
     return;
   }
-  Status cause(Substitute("Session closed from Impala's debug web interface by client at"
-                          " $0", req.source_socket));
+  Status cause(Substitute("Session closed from Impala's debug web interface by user:"
+                          " '$0' at $1", req.source_user, req.source_socket));
   // Web UI doesn't have access to secret so we can't validate it. We assume that
   // web UI is allowed to close sessions.
   status = server_->CloseSessionInternal(unique_id,
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 5a6c1e2..27c7af9 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -730,6 +730,9 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
     req.query_string = request_info->query_string;
     BuildArgumentMap(request_info->query_string, &req.parsed_args);
   }
+  if (request_info->remote_user != nullptr) {
+    req.source_user = request_info->remote_user;
+  }
 
   HttpStatusCode response = HttpStatusCode::Ok;
   ContentType content_type = HTML;
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index 8757b1e..f7848d0 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -97,6 +97,19 @@ public class LdapHS2Test {
     return execResp.getOperationHandle();
   }
 
+  /**
+   * Executes async 'query'.
+   */
+  static TOperationHandle execQueryAsync(TCLIService.Iface client,
+      TSessionHandle sessionHandle, String query)
+      throws Exception {
+    TExecuteStatementReq execReq = new TExecuteStatementReq(sessionHandle, query);
+    TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
+    verifySuccess(execResp.getStatus());
+
+    return execResp.getOperationHandle();
+  }
+
   private void verifyMetrics(long expectedBasicAuthSuccess, long expectedBasicAuthFailure)
       throws Exception {
     long actualBasicAuthSuccess = (long) metrics.getMetric(
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
index 6e68db6..4e50d8f 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
@@ -28,16 +28,22 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Range;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.directory.server.core.annotations.CreateDS;
 import org.apache.directory.server.core.annotations.CreatePartition;
 import org.apache.directory.server.annotations.CreateLdapServer;
 import org.apache.directory.server.annotations.CreateTransport;
 import org.apache.directory.server.core.annotations.ApplyLdifFiles;
 import org.apache.directory.server.core.integ.CreateLdapServerRule;
+import org.apache.hive.service.rpc.thrift.*;
 import org.apache.impala.util.Metrics;
 import org.apache.log4j.Logger;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.THttpClient;
 import org.junit.After;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -297,6 +303,56 @@ public class LdapWebserverTest {
     verifyJwtAuthMetrics(Range.closed(1L, 1L), Range.closed(1L, 1L));
   }
 
+  /**
+   * Print the username closing a session or cancelling a query from the WebUI.
+   */
+  @Test
+  public void testDisplaySrcUsernameInQueryCause() throws Exception {
+    setUp("", "");
+    // Create client
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+    // Authenticate as 'Test1Ldap' with password '12345'
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'Test1Ldap'.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+
+    // Execute a long running query then cancel it from the WebUI.
+    // Check the runtime profile and the INFO logs for the cause message.
+    TOperationHandle operationHandle = LdapHS2Test.execQueryAsync(
+        client, openResp.getSessionHandle(), "select sleep(10000)");
+    String queryId = PrintId(operationHandle.getOperationId());
+    String cancelQueryUrl = String.format("/cancel_query?query_id=%s", queryId);
+    String textProfileUrl = String.format("/query_profile_plain_text?query_id=%s",
+            queryId);
+    metrics_.readContent(cancelQueryUrl);
+    String response =  metrics_.readContent(textProfileUrl);
+    String cancelStatus = String.format("Cancelled from Impala&apos;s debug web interface"
+        + " by user: &apos;%s&apos; at", TEST_USER_1);
+    assertTrue(response.contains(cancelStatus));
+    // Wait for logs to flush
+    TimeUnit.SECONDS.sleep(6);
+    response = metrics_.readContent("/logs");
+    assertTrue(response.contains(cancelStatus));
+
+    // Session closing from the WebUI does not produce the cause message in the profile,
+    // so we will skip checking the runtime profile.
+    String sessionId = PrintId(openResp.getSessionHandle().getSessionId());
+    String closeSessionUrl =  String.format("/close_session?session_id=%s", sessionId);
+    metrics_.readContent(closeSessionUrl);
+    // Wait for logs to flush
+    TimeUnit.SECONDS.sleep(6);
+    String closeStatus = String.format("Session closed from Impala&apos;s debug web"
+        + " interface by user: &apos;%s&apos; at", TEST_USER_1);
+    response = metrics_.readContent("/logs");
+    assertTrue(response.contains(closeStatus));
+  }
+
   // Helper method to make a get call to the webserver using the input basic
   // auth token and x-forward-for token.
   private void attemptConnection(String basic_auth_token, String xff_address)
@@ -311,4 +367,17 @@ public class LdapWebserverTest {
     }
     connection.getInputStream();
   }
+
+  // Helper method to get query id or session id
+  private static String PrintId(THandleIdentifier handle) {
+    // The binary representation is present in the query handle but we need to
+    // massage it into the expected string representation.
+    byte[] guid_bytes = handle.getGuid();
+    assertEquals(guid_bytes.length,16);
+    byte[] low_bytes = ArrayUtils.subarray(guid_bytes, 0, 8);
+    byte[] high_bytes = ArrayUtils.subarray(guid_bytes, 8, 16);
+    ArrayUtils.reverse(low_bytes);
+    ArrayUtils.reverse(high_bytes);
+    return Hex.encodeHexString(low_bytes) + ":" + Hex.encodeHexString(high_bytes);
+  }
 }
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 01661a4..bfed002 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -823,10 +823,11 @@ class TestWebPage(ImpalaTestSuite):
       .format("25000"), query_id)
     requests.get(cancel_query_url)
     response = requests.get(text_profile_url)
-    cancel_status = "Cancelled from Impala&apos;s debug web interface by client at"
+    cancel_status = "Cancelled from Impala&apos;s debug web interface by user: " \
+                    "&apos;anonymous&apos; at"
     assert cancel_status in response.text
     self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug web "
-      "interface by client at", expected_count=-1)
+      "interface by user: 'anonymous' at", expected_count=-1)
     # Session closing from the WebUI does not produce the cause message in the profile,
     # so we will skip checking the runtime profile.
     results = self.execute_query("select current_session()")
@@ -835,7 +836,7 @@ class TestWebPage(ImpalaTestSuite):
       ("25000"), session_id)
     requests.get(close_session_url)
     self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s debug "
-      "web interface by client at", expected_count=-1)
+      "web interface by user: 'anonymous' at", expected_count=-1)
 
   def test_catalog_operations_endpoint(self):
     """Test to check that the /operations endpoint returns 200 OK."""