You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/28 20:51:43 UTC

[impala] branch master updated: IMPALA-9989 Improve admission control pool stats logging

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ef6184  IMPALA-9989 Improve admission control pool stats logging
2ef6184 is described below

commit 2ef6184ee1010d29fdaa5cd4ba5f0c95ef9abc0d
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Mon Jul 20 13:20:11 2020 -0700

    IMPALA-9989 Improve admission control pool stats logging
    
    This work addresses the current limitation in admission controller by
    appending the last known memory consumption statistics about the set of
    queries running or waiting on a host or in a pool to the existing memory
    exhaustion message. The statistics is logged in impalad.INFO when a
    query is queued or queued and then timed out due to memory pressure in
    the pool or on the host. The statistics can also be part of the query
    profile.
    
    The new memory consumption statistics can be either stats on host or
    aggregated pool stats. The stats on host describes memory consumption
    for every pool on a host. The aggregated pool stats describes the
    aggregated memory consumption on all hosts for a pool. For each stats
    type, information such as query Ids and memory consumption of up to top
    5 queries is provided, in addition to the min, the max, the average and
    the total memory consumption for the query set.
    
    When a query request is queued due to memory exhaustion, the above
    new consumption statistics is logged when the BE logging level is set
    at 2.
    
    When a query request is timed out due to memory exhaustion, the above
    new consumption statistics is logged when the BE logging level is set
    at 1.
    
    Testing:
    1. Added a new test TopNQueryCheck in admission-controller-test.cc to
       verify that the topN query memory consumption details are reported
       correctly.
    2. Add two new tests in test_admission_controller.py to simulate
       queries being queued and then timed out due to pool or host memory
       pressure.
    3. Added a new test TopN in mem-tracker-test.cc to
       verify that the topN query memory consumption details are computed
       correctly from a mem tracker hierarchy.
    4. Ran Core tests successfully.
    
    Change-Id: Id995a9d044082c3b8f044e1ec25bb4c64347f781
    Reviewed-on: http://gerrit.cloudera.org:8080/16220
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/mem-tracker-test.cc                |  37 +++
 be/src/runtime/mem-tracker.cc                     |  72 +++++
 be/src/runtime/mem-tracker.h                      |  47 ++-
 be/src/scheduling/admission-controller-test.cc    | 261 ++++++++++++++--
 be/src/scheduling/admission-controller.cc         | 364 +++++++++++++++++++++-
 be/src/scheduling/admission-controller.h          |  90 +++++-
 be/src/util/container-util.h                      |   9 +
 common/thrift/StatestoreService.thrift            |  30 ++
 common/thrift/generate_error_codes.py             |   2 +-
 tests/custom_cluster/test_admission_controller.py |  59 ++++
 10 files changed, 924 insertions(+), 47 deletions(-)

diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 44bd9f4..de556e1 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -360,5 +360,42 @@ TEST(MemTestTest, GcFunctions) {
   // Clean up.
   t.Release(10);
 }
+
+// Test that we can compute topN queries from a hierarchy of mem trackers. These
+// queries are represented by 100 query mem trackers.
+TEST(MemTestTest, TopN) {
+  MemTracker root;
+  root.Consume(10);
+
+  static const int NUM_QUERY_MEM_TRACKERS = 100;
+  // Populate these many query mem trackers with some memory consumptions.
+  std::vector<MemTracker*> trackers;
+  for (int i = 0; i < NUM_QUERY_MEM_TRACKERS; i++) {
+    MemTracker* tracker = new MemTracker(-1, "", &root);
+    tracker->query_id_.hi = 0;
+    tracker->query_id_.lo = i;
+    tracker->Consume(int64_t(i + 1));
+    tracker->is_query_mem_tracker_ = true;
+    trackers.push_back(tracker);
+  }
+  // Ready to compute top 5 queries which should be the last 5 of those
+  // populated above. The result is to be saved in pool_stats.heavy_memory_queries.
+  TPoolStats pool_stats;
+  root.UpdatePoolStatsForQueries(5, pool_stats);
+  // Validate the top entries
+  for (int i = 0; i < 5; i++) {
+    EXPECT_EQ(pool_stats.heavy_memory_queries[i].queryId.hi, 0);
+    EXPECT_EQ(
+        pool_stats.heavy_memory_queries[i].queryId.lo, NUM_QUERY_MEM_TRACKERS - i - 1);
+    EXPECT_EQ(
+        pool_stats.heavy_memory_queries[i].memory_consumed, NUM_QUERY_MEM_TRACKERS - i);
+  }
+  // Delete the allocated query mem trackers.
+  for (int i = 0; i < NUM_QUERY_MEM_TRACKERS; i++) {
+    trackers[i]->Release(int64_t(i + 1));
+    delete trackers[i];
+  }
+  root.Release(10);
+}
 }
 
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 6db0dc8..96fd3f9 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -31,6 +31,7 @@
 #include "util/pretty-printer.h"
 #include "util/test-info.h"
 #include "util/uid-util.h"
+#include "util/container-util.h"
 
 #include "common/names.h"
 
@@ -404,6 +405,69 @@ void MemTracker::GetTopNQueries(
   }
 }
 
+// Update the memory consumption related fields in pool_stats.
+void MemTracker::UpdatePoolStatsForMemoryConsumed(
+    int64_t mem_consumed, TPoolStats& pool_stats) {
+  if (pool_stats.min_memory_consumed > mem_consumed) {
+    pool_stats.min_memory_consumed = mem_consumed;
+  }
+  if (pool_stats.max_memory_consumed < mem_consumed) {
+    pool_stats.max_memory_consumed = mem_consumed;
+  }
+  pool_stats.total_memory_consumed += mem_consumed;
+  pool_stats.num_running++;
+}
+
+// Update pool_stats for all queries tracked through query memory trackers:
+void MemTracker::UpdatePoolStatsForQueries(int limit, TPoolStats& pool_stats) {
+  if (limit == 0) return;
+  // Init all memory consumption related fields
+  pool_stats.heavy_memory_queries.clear();
+  pool_stats.min_memory_consumed = std::numeric_limits<int64_t>::max();
+  pool_stats.max_memory_consumed = 0;
+  pool_stats.total_memory_consumed = 0;
+  pool_stats.num_running = 0;
+  // Collect the top 'limit' queries into 'min_pq'.
+  MinPriorityQueue min_pq;
+  GetTopNQueriesAndUpdatePoolStats(min_pq, limit, pool_stats);
+  // Grab all remaining entries from the priority queue and assign them in the descending
+  // order of memory consumption to the pool_stats.heavy_memory_queries field in
+  // pool_stats.
+  auto& queries = pool_stats.heavy_memory_queries;
+  queries.clear();
+  while (!min_pq.empty()) {
+    queries.push_back(min_pq.top());
+    min_pq.pop();
+  }
+  std::reverse(queries.begin(), queries.end());
+  // If not a single query is found, set the min_memory_consumed to 0.
+  if (pool_stats.num_running == 0) {
+    pool_stats.min_memory_consumed = 0;
+  }
+}
+
+void MemTracker::GetTopNQueriesAndUpdatePoolStats(
+    MinPriorityQueue& min_pq, int limit, TPoolStats& pool_stats) {
+  lock_guard<SpinLock> l(child_trackers_lock_);
+  for (MemTracker* tracker : child_trackers_) {
+    if (!tracker->is_query_mem_tracker_) {
+      tracker->GetTopNQueriesAndUpdatePoolStats(min_pq, limit, pool_stats);
+    } else {
+      DCHECK(tracker->is_query_mem_tracker_) << label_;
+      int64_t mem_consumed = tracker->consumption();
+
+      THeavyMemoryQuery heavy_memory_query;
+      heavy_memory_query.__set_memory_consumed(mem_consumed);
+      heavy_memory_query.__set_queryId(tracker->query_id_);
+
+      min_pq.push(heavy_memory_query);
+      if (min_pq.size() > limit) min_pq.pop();
+
+      UpdatePoolStatsForMemoryConsumed(mem_consumed, pool_stats);
+    }
+  }
+}
+
 MemTracker* MemTracker::GetQueryMemTracker() {
   MemTracker* tracker = this;
   while (tracker != nullptr && !tracker->is_query_mem_tracker_) {
@@ -412,6 +476,14 @@ MemTracker* MemTracker::GetQueryMemTracker() {
   return tracker;
 }
 
+MemTracker* MemTracker::GetRootMemTracker() {
+  MemTracker* ancestor = this;
+  while (ancestor && ancestor->parent()) {
+    ancestor = ancestor->parent();
+  }
+  return ancestor;
+}
+
 Status MemTracker::MemLimitExceeded(MemTracker* mtracker, RuntimeState* state,
     const std::string& details, int64_t failed_allocation_size) {
   DCHECK_GE(failed_allocation_size, 0);
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index fc9c98e..1e13465 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -39,7 +39,9 @@
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
+#include "gen-cpp/StatestoreService.h" // for TPoolStats
 #include "gen-cpp/Types_types.h" // for TUniqueId
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
 
 namespace impala {
 
@@ -329,8 +331,8 @@ class MemTracker {
   /// for the process MemTracker.
   /// TODO: once all memory is accounted in ReservationTracker hierarchy, move
   /// reporting there.
-  std::string LogUsage(int max_recursive_depth,
-      const std::string& prefix = "", int64_t* logged_consumption = nullptr);
+  std::string LogUsage(int max_recursive_depth, const std::string& prefix = "",
+      int64_t* logged_consumption = nullptr);
   /// Dumping the process MemTracker is expensive. Limiting the recursive depth
   /// to two levels limits the level of detail to a one-line summary for each query
   /// MemTracker, avoiding all MemTrackers below that level. This provides a summary
@@ -344,6 +346,16 @@ class MemTracker {
   /// consumption.
   std::string LogTopNQueries(int limit);
 
+  /// Update the following data members in pool_stats for all queries tracked through
+  /// query memory trackers:
+  ///   heavy_memory_queries: the query Ids of top 'limit' queries in memory consumption
+  ///   (in descending order)
+  ///   min_memory_consumed: the minimal memory consumed among all queries
+  ///   max_memory_consumed: the maximal memory consumed among all queries
+  ///   total_memory_consumed: the total memory consumed by all queries
+  ///   num_running: the total number of all queries
+  void UpdatePoolStatsForQueries(int limit, TPoolStats& pool_stats);
+
   /// Log the memory usage when memory limit is exceeded and return a status object with
   /// details of the allocation which caused the limit to be exceeded.
   /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
@@ -399,14 +411,17 @@ class MemTracker {
   /// and populates 'min_pq' with 'limit' number of elements (that contain state related
   /// to query MemTrackers) based on maximum total memory consumption.
   void GetTopNQueries(
-      std::priority_queue<pair<int64_t, string>,
-          vector<pair<int64_t, string>>, std::greater<pair<int64_t, string>>>& min_pq,
+      std::priority_queue<pair<int64_t, string>, vector<pair<int64_t, string>>,
+          std::greater<pair<int64_t, string>>>& min_pq,
       int limit);
 
   /// If an ancestor of this tracker is a query MemTracker, return that tracker.
   /// Otherwise return NULL.
   MemTracker* GetQueryMemTracker();
 
+  /// Return the root ancestor of this tracker.
+  MemTracker* GetRootMemTracker();
+
   /// Increases/Decreases the consumption of this tracker and the ancestors up to (but
   /// not including) end_tracker.
   void ChangeConsumption(int64_t bytes, MemTracker* end_tracker) {
@@ -421,6 +436,26 @@ class MemTracker {
     DCHECK(false) << "end_tracker is not an ancestor";
   }
 
+  /// Update the following fields in pool_stats.
+  ///   min_memory_consumed: take the min of min_memory_consumed and mem_consumed
+  ///   max_memory_consumed: take the max of max_memory_consumed and mem_consumed
+  ///   total_memory_consumed: increased by mem_consumed
+  ///   num_running++
+  void UpdatePoolStatsForMemoryConsumed(int64_t mem_consumed, TPoolStats& pool_stats);
+
+  /// Collect the top N queries into a priority queue, and computes the min, the max,
+  /// the total memory consumption, and the total number of all queries that are
+  /// all children query mem trackers. The top element in the queue is the
+  /// min-element such that the remaining elements after a sequence of pop operations
+  /// are the largest in memory consumptions. This method should only be called for
+  /// mem-trackers that are either query mem trackers or higher in the mem tracker
+  /// hierarchy.
+  typedef std::priority_queue<THeavyMemoryQuery, std::vector<THeavyMemoryQuery>,
+      std::greater<THeavyMemoryQuery>>
+      MinPriorityQueue;
+  void GetTopNQueriesAndUpdatePoolStats(
+      MinPriorityQueue& min_pq, int limit, TPoolStats& pool_stats);
+
   /// Lock to protect GcMemory(). This prevents many GCs from occurring at once.
   std::mutex gc_lock_;
 
@@ -505,6 +540,10 @@ class MemTracker {
 
   /// Metric for limit_.
   IntGauge* limit_metric_;
+
+  friend class AdmissionControllerTest;
+  friend class MemTestTest_TopN_Test;
+  FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
 };
 
 /// Global registry for query and pool MemTrackers. Owned by ExecEnv.
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 6172ee4..16a22c3 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -22,6 +22,7 @@
 #include "kudu/util/logging_test_util.h"
 #include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
 #include "scheduling/schedule-state.h"
@@ -30,6 +31,7 @@
 #include "testutil/gtest-util.h"
 #include "testutil/scoped-flag-setter.h"
 #include "util/metrics.h"
+#include <regex>
 
 // Access the flags that are defined in RequestPoolService.
 DECLARE_string(fair_scheduler_allocation_path);
@@ -47,12 +49,16 @@ static const string QUEUE_C = "root.queueC";
 static const string QUEUE_D = "root.queueD";
 
 // Host names
+static const string HOST_0 = "host0:25000";
 static const string HOST_1 = "host1:25000";
 static const string HOST_2 = "host2:25000";
 
 static const int64_t MEGABYTE = 1024L * 1024L;
 static const int64_t GIGABYTE = 1024L * MEGABYTE;
 
+// The default version of the heavy memory query list.
+static std::vector<THeavyMemoryQuery> empty_heavy_memory_query_list;
+
 /// Parent class for Admission Controller tests.
 /// Common code and constants should go here.
 /// These are single threaded tests so we access the internal data structures of
@@ -169,13 +175,54 @@ class AdmissionControllerTest : public testing::Test {
     return delta;
   }
 
+  // Form the hi part of a query Id which is an enumeration of the pool name.
+  static int64_t FormQueryIdHi(const std::string& pool_name) {
+    // Translate pool name to pool Id
+    int pool_id = 0; // for pool QUEUE_A
+    if (pool_name == QUEUE_B) {
+      pool_id = 1;
+    } else if (pool_name == QUEUE_C) {
+      pool_id = 2;
+    } else if (pool_name == QUEUE_D) {
+      pool_id = 3;
+    }
+    return ((int64_t)pool_id);
+  }
+
+  /// Build a vector of THeavyMemoryQuery objects with "queries" queries. The id of each
+  /// query is composed of the pool id and a sequence number. The memory consumed by the
+  /// query is a number randomly chosen between 1MB and 20MB.
+  static std::vector<THeavyMemoryQuery> MakeHeavyMemoryQueryList(
+      const std::string pool, const int queries) {
+    // Generate the query list
+    std::vector<THeavyMemoryQuery> query_list;
+    int64_t hi = FormQueryIdHi(pool);
+    for (int i = 0; i < queries; i++) {
+      THeavyMemoryQuery query;
+      query.memory_consumed = (rand() % 20 + 1) * MEGABYTE;
+      query.queryId.hi = hi;
+      query.queryId.lo = i;
+      query_list.emplace_back(query);
+    }
+    return query_list;
+  }
+
   /// Build a TPoolStats object.
   static TPoolStats MakePoolStats(const int backend_mem_reserved,
-      const int num_admitted_running, const int num_queued) {
+      const int num_admitted_running, const int num_queued,
+      const std::vector<THeavyMemoryQuery>& heavy_memory_queries =
+          empty_heavy_memory_query_list,
+      const int64_t min_memory_consumed = 0, const int64_t max_memory_consumed = 0,
+      const int64_t total_memory_consumed = 0, const int64_t num_running = 0) {
     TPoolStats stats;
     stats.backend_mem_reserved = backend_mem_reserved;
     stats.num_admitted_running = num_admitted_running;
     stats.num_queued = num_queued;
+    stats.heavy_memory_queries = heavy_memory_queries;
+    stats.min_memory_consumed = min_memory_consumed;
+    stats.max_memory_consumed = max_memory_consumed;
+    stats.total_memory_consumed = total_memory_consumed;
+    stats.num_running = num_running;
     return stats;
   }
 
@@ -241,8 +288,8 @@ class AdmissionControllerTest : public testing::Test {
     addr->__set_port(25000);
     ClusterMembershipMgr* cmm =
         pool_.Add(new ClusterMembershipMgr("", nullptr, metric_group));
-    return pool_.Add(new AdmissionController(
-        cmm, nullptr, request_pool_service, metric_group, *addr));
+    return pool_.Add(
+        new AdmissionController(cmm, nullptr, request_pool_service, metric_group, *addr));
   }
 
   static void checkPoolDisabled(
@@ -252,6 +299,13 @@ class AdmissionControllerTest : public testing::Test {
     pool_config.max_mem_resources = max_mem_resources;
     ASSERT_EQ(expected_result, AdmissionController::PoolDisabled(pool_config));
   }
+
+  void ResetMemConsumed(MemTracker* tracker) {
+    tracker->consumption_->Set(0);
+    for (MemTracker* child : tracker->child_trackers_) {
+      ResetMemConsumed(child);
+    }
+  }
 };
 
 /// Test that AdmissionController will admit a query into a pool, then simulate other
@@ -279,14 +333,14 @@ TEST_F(AdmissionControllerTest, Simple) {
   // Check that the query can be admitted.
   string not_admitted_reason;
   ASSERT_TRUE(admission_controller->CanAdmitRequest(
-      *schedule_state, config_c, true, &not_admitted_reason));
+      *schedule_state, config_c, true, &not_admitted_reason, nullptr));
 
   // Create a ScheduleState just like 'schedule_state' to run on 3 hosts which can't be
   // admitted.
   ScheduleState* schedule_state_3_hosts =
       MakeScheduleState(QUEUE_C, config_c, 3, 64L * MEGABYTE);
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *schedule_state_3_hosts, config_c, true, &not_admitted_reason));
+      *schedule_state_3_hosts, config_c, true, &not_admitted_reason, nullptr));
   EXPECT_STR_CONTAINS(not_admitted_reason,
       "Not enough aggregate memory available in pool root.queueC with max mem "
       "resources 128.00 MB. Needed 192.00 MB but only 128.00 MB was available.");
@@ -317,9 +371,9 @@ TEST_F(AdmissionControllerTest, Simple) {
 
   // Test that the query cannot be admitted now.
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *schedule_state, config_c, true, &not_admitted_reason));
-  EXPECT_STR_CONTAINS(not_admitted_reason,
-      "number of running queries 11 is at or over limit 10.");
+      *schedule_state, config_c, true, &not_admitted_reason, nullptr));
+  EXPECT_STR_CONTAINS(
+      not_admitted_reason, "number of running queries 11 is at or over limit 10.");
 }
 
 /// Test CanAdmitRequest in the context of aggregated memory required to admit a query.
@@ -348,7 +402,7 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestMemory) {
   // Check that the query can be admitted.
   string not_admitted_reason;
   ASSERT_TRUE(admission_controller->CanAdmitRequest(
-      *schedule_state, config_d, true, &not_admitted_reason));
+      *schedule_state, config_d, true, &not_admitted_reason, nullptr));
 
   // Tests that this query cannot be admitted.
   // Increasing the number of hosts pushes the aggregate memory required to admit this
@@ -357,7 +411,7 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestMemory) {
   ScheduleState* schedule_state15 =
       MakeScheduleState(QUEUE_D, config_d, host_count, 30L * MEGABYTE);
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *schedule_state15, config_d, true, &not_admitted_reason));
+      *schedule_state15, config_d, true, &not_admitted_reason, nullptr));
   EXPECT_STR_CONTAINS(not_admitted_reason,
       "Not enough aggregate memory available in pool root.queueD with max mem resources "
       "400.00 MB. Needed 480.00 MB but only 400.00 MB was available.");
@@ -369,7 +423,7 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestMemory) {
       MakeScheduleState(QUEUE_D, config_d, host_count, 50L * MEGABYTE);
 
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *schedule_state_10_fail, config_d, true, &not_admitted_reason));
+      *schedule_state_10_fail, config_d, true, &not_admitted_reason, nullptr));
   EXPECT_STR_CONTAINS(not_admitted_reason,
       "Not enough aggregate memory available in pool root.queueD with max mem resources "
       "400.00 MB. Needed 500.00 MB but only 400.00 MB was available.");
@@ -404,17 +458,17 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestCount) {
 
   // Query can be admitted from queue...
   ASSERT_TRUE(admission_controller->CanAdmitRequest(
-      *schedule_state, config_d, true, &not_admitted_reason));
+      *schedule_state, config_d, true, &not_admitted_reason, nullptr));
   // ... but same Query cannot be admitted directly.
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *schedule_state, config_d, false, &not_admitted_reason));
+      *schedule_state, config_d, false, &not_admitted_reason, nullptr));
   EXPECT_STR_CONTAINS(not_admitted_reason,
       "queue is not empty (size 2); queued queries are executed first");
 
   // Simulate that there are 7 queries already running.
   pool_stats->agg_num_running_ = 7;
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *schedule_state, config_d, true, &not_admitted_reason));
+      *schedule_state, config_d, true, &not_admitted_reason, nullptr));
   EXPECT_STR_CONTAINS(
       not_admitted_reason, "number of running queries 7 is at or over limit 6");
 }
@@ -454,10 +508,10 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) {
 
   // Enough slots are available so it can be admitted in both cases.
   ASSERT_TRUE(admission_controller->CanAdmitRequest(
-      *default_group_schedule, config_d, true, &not_admitted_reason))
+      *default_group_schedule, config_d, true, &not_admitted_reason, nullptr))
       << not_admitted_reason;
   ASSERT_TRUE(admission_controller->CanAdmitRequest(
-      *other_group_schedule, config_d, true, &not_admitted_reason))
+      *other_group_schedule, config_d, true, &not_admitted_reason, nullptr))
       << not_admitted_reason;
 
   // Simulate that almost all the slots are in use, which prevents admission in the
@@ -465,10 +519,10 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) {
   SetSlotsInUse(admission_controller, host_addrs, slots_per_host - 1);
 
   ASSERT_TRUE(admission_controller->CanAdmitRequest(
-      *default_group_schedule, config_d, true, &not_admitted_reason))
+      *default_group_schedule, config_d, true, &not_admitted_reason, nullptr))
       << not_admitted_reason;
   ASSERT_FALSE(admission_controller->CanAdmitRequest(
-      *other_group_schedule, config_d, true, &not_admitted_reason));
+      *other_group_schedule, config_d, true, &not_admitted_reason, nullptr));
   EXPECT_STR_CONTAINS(not_admitted_reason,
       "Not enough admission control slots available on host host1:25000. Needed 4 "
       "slots but 15/16 are already in use.");
@@ -605,8 +659,7 @@ TEST_F(AdmissionControllerTest, GetMaxToDequeue) {
 
   int64_t max_to_dequeue;
   // Queue is empty, so nothing to dequeue
-  max_to_dequeue =
-      admission_controller->GetMaxToDequeue(queue_c, stats_c, config_c);
+  max_to_dequeue = admission_controller->GetMaxToDequeue(queue_c, stats_c, config_c);
   ASSERT_EQ(0, max_to_dequeue);
 
   AdmissionController::PoolStats stats(admission_controller, "test");
@@ -618,20 +671,17 @@ TEST_F(AdmissionControllerTest, GetMaxToDequeue) {
   stats.local_stats_.num_queued = 10;
   stats.agg_num_queued_ = 20;
   stats.agg_num_running_ = 10;
-  max_to_dequeue =
-      admission_controller->GetMaxToDequeue(queue_c, &stats, config);
+  max_to_dequeue = admission_controller->GetMaxToDequeue(queue_c, &stats, config);
   ASSERT_EQ(0, max_to_dequeue);
 
   // Can only dequeue 1.
   stats.agg_num_running_ = 9;
-  max_to_dequeue =
-      admission_controller->GetMaxToDequeue(queue_c, &stats, config);
+  max_to_dequeue = admission_controller->GetMaxToDequeue(queue_c, &stats, config);
   ASSERT_EQ(1, max_to_dequeue);
 
   // There is space for 10 but it looks like there are 2 coordinators.
   stats.agg_num_running_ = 0;
-  max_to_dequeue =
-      admission_controller->GetMaxToDequeue(queue_c, &stats, config);
+  max_to_dequeue = admission_controller->GetMaxToDequeue(queue_c, &stats, config);
   ASSERT_EQ(5, max_to_dequeue);
 }
 
@@ -789,7 +839,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
 
   TPoolConfig pool_config;
   ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
-  pool_config.__set_max_mem_resources(2*GIGABYTE); // to enable memory based admission.
+  pool_config.__set_max_mem_resources(2 * GIGABYTE); // to enable memory based admission.
 
   // Set up a query schedule to test.
   const int64_t PER_EXEC_MEM_ESTIMATE = GIGABYTE;
@@ -904,4 +954,161 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
       "needed given the current plan: 1.00 GB");
 }
 
+/// Test that AdmissionController can identify 5 queries with top memory consumption
+/// from 4 pools. Each pool holds a number of queries with different memory consumptions.
+TEST_F(AdmissionControllerTest, TopNQueryCheck) {
+  // Pass the paths of the configuration files as command line flags
+  FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
+  FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = admission_controller->request_pool_service_;
+
+  // A vector of 4 specs. Each spec defines a pool with a number of queries running
+  // in the pool.
+  std::vector<std::tuple<std::string, TPoolConfig, int>> resource_pools(4);
+
+  // Establish the pool names
+  std::get<0>(resource_pools[0]) = QUEUE_A;
+  std::get<0>(resource_pools[1]) = QUEUE_B;
+  std::get<0>(resource_pools[2]) = QUEUE_C;
+  std::get<0>(resource_pools[3]) = QUEUE_D;
+
+  // Setup the pool config
+  for (int i = 0; i < 4; i++) {
+    const std::string& pool_name = std::get<0>(resource_pools[i]);
+    TPoolConfig& pool_config = std::get<1>(resource_pools[i]);
+    ASSERT_OK(request_pool_service->GetPoolConfig(pool_name, &pool_config));
+  }
+
+  // Set the number of queries to run in each pool
+  std::get<2>(resource_pools[0]) = 5;
+  std::get<2>(resource_pools[1]) = 10;
+  std::get<2>(resource_pools[2]) = 20;
+  std::get<2>(resource_pools[3]) = 20;
+
+  // Set the seed for the random generator used below to generate
+  // memory needed for each query.
+  srand(19);
+
+  MemTracker* pool_mem_tracker = nullptr;
+
+  // Process each of the 4 pools.
+  for (int i = 0; i < 4; i++) {
+    // Get the parameters of the pool.
+    const std::string& pool_name = std::get<0>(resource_pools[i]);
+    TPoolConfig& pool_config = std::get<1>(resource_pools[i]);
+    int num_queries = std::get<2>(resource_pools[i]);
+
+    // For all queries in the pool, fabricate the hi part of the query Id.
+    TUniqueId id;
+    id.hi = FormQueryIdHi(pool_name);
+
+    // Create a number of queries to run inside the pool.
+    for (int j = 0; j < num_queries; j++) {
+      // For each query, fabricate the lo part of the query Id.
+      id.lo = j;
+      // Next create a ScheduleState that runs on this host with per host
+      // memory limit as a random number between 1MB and 10MB.
+      long per_host_mem_estimate = (rand() % 10 + 1) * MEGABYTE;
+      ScheduleState* schedule_state =
+          MakeScheduleState(pool_name, pool_config, 1, per_host_mem_estimate);
+      schedule_state->UpdateMemoryRequirements(pool_config);
+      // Admit the query to the pool.
+      string not_admitted_reason;
+      ASSERT_TRUE(admission_controller->CanAdmitRequest(
+          *schedule_state, pool_config, true, &not_admitted_reason, nullptr));
+      // Create a query memory tracker for the query and set the memory consumption
+      // as per_host_mem_estimate.
+      int64_t mem_consumed = per_host_mem_estimate;
+      MemTracker::CreateQueryMemTracker(id, -1 /*mem_limit*/, pool_name, &pool_)
+          ->Consume(mem_consumed);
+    }
+    // Get the pool mem tracker.
+    pool_mem_tracker =
+        ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(
+            pool_name, false);
+    ASSERT_TRUE(pool_mem_tracker);
+    // Create the pool stats.
+    AdmissionController::PoolStats* pool_stats =
+        admission_controller->GetPoolStats(pool_name, true);
+    // Update the local stats in pool stats with up to 5 top queries.
+    pool_stats->UpdateMemTrackerStats();
+  }
+  //
+  // Next make a TopicDeltaMap describing some activity on HOST_1 and HOST_2.
+  //
+  TTopicDelta membership = MakeTopicDelta(false);
+  AddStatsToTopic(&membership, HOST_1, QUEUE_B,
+      MakePoolStats(1000, 1, 0, MakeHeavyMemoryQueryList(QUEUE_B, 5),
+          5 * MEGABYTE /*min*/, 20 * MEGABYTE /*max*/, 100 * MEGABYTE /*total*/,
+          4 /*running*/));
+  AddStatsToTopic(&membership, HOST_1, QUEUE_C,
+      MakePoolStats(5000, 10, 0, MakeHeavyMemoryQueryList(QUEUE_C, 2),
+          10 * MEGABYTE /*min*/, 200 * MEGABYTE /*max*/, 500 * MEGABYTE /*total*/,
+          40 /*running*/));
+  AddStatsToTopic(&membership, HOST_2, QUEUE_C,
+      MakePoolStats(5000, 1, 0, MakeHeavyMemoryQueryList(QUEUE_C, 5),
+          10 * MEGABYTE /*min*/, 2000 * MEGABYTE /*max*/, 10000 * MEGABYTE /*total*/,
+          100 /*running*/));
+  // Imitate the StateStore passing updates on query activity to the
+  // AdmissionController.
+  StatestoreSubscriber::TopicDeltaMap incoming_topic_deltas;
+  incoming_topic_deltas.emplace(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, membership);
+  vector<TTopicDelta> outgoing_topic_updates;
+  admission_controller->UpdatePoolStats(incoming_topic_deltas, &outgoing_topic_updates);
+
+  //
+  // Find the top 5 queries from these 4 pools in HOST_0
+  //
+  string mem_details_for_host0 =
+      admission_controller->GetLogStringForTopNQueriesOnHost(HOST_0);
+  // Verify that the 5 top ones appear in the following order.
+  std::regex pattern_pools_for_host0(".*"+
+       QUEUE_B+".*"+"id=0000000000000001:0000000000000002, consumed=10.00 MB"+".*"+
+       QUEUE_A+".*"+"id=0000000000000000:0000000000000000, consumed=10.00 MB"+".*"+
+       QUEUE_D+".*"+"id=0000000000000003:0000000000000011, consumed=9.00 MB"+".*"+
+                    "id=0000000000000003:000000000000000a, consumed=9.00 MB"+".*"+
+                    "id=0000000000000003:0000000000000007, consumed=9.00 MB"+".*"
+       ,std::regex::basic
+       );
+  ASSERT_TRUE(std::regex_match(mem_details_for_host0, pattern_pools_for_host0));
+
+  //
+  // Next find the top 5 queries from these 4 pools in HOST_1
+  //
+  string mem_details_for_host1 =
+      admission_controller->GetLogStringForTopNQueriesOnHost(HOST_1);
+  // Verify that the 5 top ones appear in the following order.
+  std::regex pattern_pools_for_host1(".*"+
+       QUEUE_B+".*"+"id=0000000000000001:0000000000000004, consumed=20.00 MB"+".*"+
+                    "id=0000000000000001:0000000000000003, consumed=19.00 MB"+".*"+
+                    "id=0000000000000001:0000000000000002, consumed=8.00 MB"+".*"+
+       QUEUE_C+".*"+"id=0000000000000002:0000000000000000, consumed=18.00 MB"+".*"+
+                    "id=0000000000000002:0000000000000001, consumed=12.00 MB"+".*"
+       ,std::regex::basic
+       );
+  ASSERT_TRUE(std::regex_match(mem_details_for_host1, pattern_pools_for_host1));
+  //
+  // Next find the top 5 queries from pool QUEUE_C among 3 hosts.
+  //
+  string mem_details_for_this_pool =
+      admission_controller->GetLogStringForTopNQueriesInPool(QUEUE_C);
+  // Verify that the 5 top ones appear in the following order.
+  std::regex pattern_aggregated(std::string(".*")+
+       "id=0000000000000002:0000000000000001, consumed=32.00 MB"+".*"+
+       "id=0000000000000002:0000000000000004, consumed=26.00 MB"+".*"+
+       "id=0000000000000002:0000000000000000, consumed=21.00 MB"+".*"+
+       "id=0000000000000002:0000000000000002, consumed=17.00 MB"+".*"+
+       "id=0000000000000002:000000000000000e, consumed=9.00 MB"+".*"
+       ,std::regex::basic
+       );
+  ASSERT_TRUE(std::regex_match(mem_details_for_this_pool, pattern_aggregated));
+
+  //
+  // Reset the consumption_ counter for all trackers so that TearDown() call
+  // can run cleanly.
+  ResetMemConsumed(pool_mem_tracker->GetRootMemTracker());
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index b50d854..3575d6b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -261,12 +261,54 @@ static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
   return true;
 }
 
+// Append to ss a debug string for memory consumption part of the pool stats.
+// Here is one example.
+// topN_query_stats: queries=[554b016cf0f3a37f:9a1bfcfd00000000,
+// 464dcd9cc47d724b:9e6a3f6400000000, 2844275a1458bf1f:0bc5887500000000,
+// a449dbc7bcbd2af1:647e6ded00000000, 8c430ea5ad38e94a:3c27bf4400000000],
+// total_mem_consumed=1.26 MB, fraction_of_pool_total_mem=0.61; pool_level_stats:
+// num_running=10, min=0, max=257.48 KB, pool_total_mem=2.06 MB, average_per_query=210.74
+// KB
+void AdmissionController::PoolStats::AppendStatsForConsumedMemory(
+    stringstream& ss, const TPoolStats& stats) {
+  ss << "topN_query_stats: ";
+  ss << "queries=[";
+  int num_ids = stats.heavy_memory_queries.size();
+  int64_t total_memory_consumed_by_top_queries = 0;
+  for (int i = 0; i < num_ids; i++) {
+    auto& query = stats.heavy_memory_queries[i];
+    total_memory_consumed_by_top_queries += query.memory_consumed;
+    ss << PrintId(query.queryId);
+    if (i < num_ids - 1) ss << ", ";
+  }
+  ss << "], ";
+  ss << "total_mem_consumed="
+     << PrettyPrinter::PrintBytes(total_memory_consumed_by_top_queries);
+  int64_t total_memory_consumed = stats.total_memory_consumed;
+  if (total_memory_consumed > 0) {
+    ss << ", fraction_of_pool_total_mem=" << setprecision(2)
+       << float(total_memory_consumed_by_top_queries) / total_memory_consumed;
+  }
+  ss << "; ";
+
+  ss << "pool_level_stats: ";
+  ss << "num_running=" << stats.num_running << ", ";
+  ss << "min=" << PrettyPrinter::PrintBytes(stats.min_memory_consumed) << ", ";
+  ss << "max=" << PrettyPrinter::PrintBytes(stats.max_memory_consumed) << ", ";
+  ss << "pool_total_mem=" << PrettyPrinter::PrintBytes(total_memory_consumed);
+  if (stats.num_running > 0) {
+    ss << ", average_per_query="
+       << PrettyPrinter::PrintBytes(total_memory_consumed / stats.num_running);
+  }
+}
+
 // Return a debug string for the pool stats.
-static string DebugPoolStats(const TPoolStats& stats) {
+string AdmissionController::PoolStats::DebugPoolStats(const TPoolStats& stats) const {
   stringstream ss;
   ss << "num_admitted_running=" << stats.num_admitted_running << ", ";
   ss << "num_queued=" << stats.num_queued << ", ";
-  ss << "backend_mem_reserved=" << PrintBytes(stats.backend_mem_reserved);
+  ss << "backend_mem_reserved=" << PrintBytes(stats.backend_mem_reserved) << ", ";
+  AppendStatsForConsumedMemory(ss, stats);
   return ss.str();
 }
 
@@ -280,6 +322,282 @@ string AdmissionController::PoolStats::DebugString() const {
   return ss.str();
 }
 
+// Output the string 'value with indentation of 'n' space characters.
+// When eof is true, append a newline.
+static void OutputIndentedString(
+    stringstream& ss, int n, const std::string& value, bool eof = true) {
+  ss << std::string(n, ' ') << value;
+  if (eof) ss << std::endl;
+}
+
+// Return a string reporting top 5 queries with most memory consumed among all
+// pools in a host.
+//
+// Here is an example of the output string for two pools.
+//    pool_name=root.queueB:
+//      topN_query_stats:
+//         queries=[
+//            id=0000000000000001:0000000000000004, consumed=20.00 MB,
+//            id=0000000000000001:0000000000000003, consumed=19.00 MB,
+//            id=0000000000000001:0000000000000002, consumed=8.00 MB
+//         ],
+//         total_consumed=47.00 MB
+//         fraction_of_pool_total_mem=0.47
+//      all_query_stats:
+//         num_running=4,
+//         min=5.00 MB,
+//         max=20.00 MB,
+//         pool_total_mem=100.00 MB,
+//         average=25.00 MB
+//
+//   pool_name=root.queueC:
+//      topN_query_stats:
+//         queries=[
+//            id=0000000000000002:0000000000000000, consumed=18.00 MB,
+//            id=0000000000000002:0000000000000001, consumed=12.00 MB
+//         ],
+//         total_consumed=30.00 MB
+//         fraction_of_pool_total_mem=0.06
+//      all_query_stats:
+//         num_running=40,
+//         min=10.00 MB,
+//         max=200.00 MB,
+//         pool_total_mem=500.00 MB,
+//         average=12.50 MB
+string AdmissionController::GetLogStringForTopNQueriesOnHost(
+    const std::string& host_id) {
+  // All heavy memory queries about 'host_id' are the starting point. Collect them
+  // into listOfTopNs.
+  stringstream ss;
+  std::vector<Item> listOfTopNs;
+  for (auto& it : pool_stats_) {
+    const TPoolStats* tpool_stats = (host_id_ == host_id) ?
+        &(it.second.local_stats()) :
+        it.second.FindTPoolStatsForRemoteHost(host_id);
+    if (!tpool_stats) continue;
+    for (auto& query : tpool_stats->heavy_memory_queries) {
+      listOfTopNs.emplace_back(
+          Item(query.memory_consumed, it.first, query.queryId, tpool_stats));
+    }
+  }
+  // If the number of items is 0, no need to go any further.
+  if (listOfTopNs.size() == 0) return "";
+
+  // Sort the list in descending order of memory consumed, pool name, qid and
+  // the address of TPoolStats.
+  sort(listOfTopNs.begin(), listOfTopNs.end(), std::greater<Item>());
+
+  // Decide the number of topN items to report from the list
+  int items = (listOfTopNs.size() >= 5) ? 5 : listOfTopNs.size();
+  // Keep first 'items' items and remove the rest.
+  listOfTopNs.resize(items);
+
+  int indent = 0;
+  OutputIndentedString(ss, indent, "", true);
+  OutputIndentedString(ss, indent, std::string("Stats for host ") + host_id);
+
+  // Use an integer vector to remember the indices of items in listOfTopNs
+  // that belong to the same pool.
+  std::vector<int> indices;
+  while (items > 0) {
+    // The first item in the list becomes 'current'.
+    indices.clear();
+    auto& current = listOfTopNs[0];
+    indices.push_back(0);
+    // Look for all other items with identical pool name as 'current'.
+    for (int j=1; j < items; j++) {
+      auto& next = listOfTopNs[j];
+      // Check on the pool name
+      if (getName(current) == getName(next)) indices.push_back(j);
+    }
+
+    // Process a new group of items with each's entry index contained in
+    // 'indices'. All of them are in the same pool.
+    AppendHeavyMemoryQueriesForAPoolInHostAtIndices(ss, listOfTopNs, indices, indent+3);
+    // Remove elements just processed.
+    for (int i = indices.size() - 1; i >= 0; i--) {
+      listOfTopNs.erase(listOfTopNs.begin() + indices[i]);
+    }
+    // The number of items remaining in the list.
+    items = listOfTopNs.size();
+  }
+  return ss.str();
+}
+
+// Return a string reporting top 5 queries with most memory consumed among all
+// hosts in a pool.
+// Here is one example.
+//      topN_query_stats:
+//         queries=[
+//            id=0000000200000002:0000000000000001, consumed=20.00 MB,
+//            id=0000000200000002:0000000000000004, consumed=18.00 MB,
+//            id=0000000100000002:0000000000000000, consumed=18.00 MB,
+//            id=0000000100000002:0000000000000001, consumed=12.00 MB,
+//            id=0000000200000002:0000000000000002, consumed=9.00 MB
+//         ],
+//         total_consumed=77.00 MB
+//         fraction_of_pool_total_mem=0.6
+string AdmissionController::GetLogStringForTopNQueriesInPool(
+    const std::string& pool_name) {
+  // All stats in pool_stats are the starting point to collect top N queries.
+  PoolStats* pool_stats = GetPoolStats(pool_name, true);
+
+  std::vector<Item> listOfTopNs;
+
+  // Collect for local stats
+  const TPoolStats& local = pool_stats->local_stats();
+  for (auto& query : local.heavy_memory_queries) {
+    listOfTopNs.emplace_back(
+      Item(query.memory_consumed, host_id_, query.queryId, nullptr));
+  }
+
+  // Collect for all remote stats
+  for (auto& it : pool_stats->remote_stats()) {
+    const TPoolStats& remote_stats = it.second;
+    for (auto& query : remote_stats.heavy_memory_queries) {
+      listOfTopNs.emplace_back(
+          Item(query.memory_consumed, it.first /*host id*/, query.queryId, nullptr));
+    }
+  }
+
+  // If the number of items is 0, no need to go any further.
+  if (listOfTopNs.size() == 0) return "";
+
+  // Group items by queryId.
+  sort(listOfTopNs.begin(), listOfTopNs.end(), [&](const Item& lhs, const Item& rhs) {
+    return getTUniqueId(lhs) < getTUniqueId(rhs);
+  });
+  // Compute the total mem consumed by all these queries.
+  int64_t init_value = 0;
+  int64_t total_mem_consumed = std::accumulate(listOfTopNs.begin(), listOfTopNs.end(),
+      init_value, [&](auto sum, const auto& x) { return sum + getMemConsumed(x); });
+  // Next aggregate on mem_consumed for each group. First define a list of
+  // items that will receive the aggregates.
+  std::vector<Item> listOfAggregatedItems;
+  auto it = listOfTopNs.begin();
+  while (it != listOfTopNs.end()) {
+    // Find a span of items identical in queryId. The span is defined by [it, next)
+    auto next = it;
+    next++;
+    while (next != listOfTopNs.end() && getTUniqueId(*it) == getTUniqueId(*next)) {
+      next++;
+    }
+    // Aggregate over mem_consumed for items in the span.
+    init_value = 0;
+    auto sum_mem_consumed = std::accumulate(it, next, init_value,
+        [&](auto sum, const auto& x) { return sum + getMemConsumed(x); });
+    // Append a new Item at the end of listOfAggregatedItems.
+    listOfAggregatedItems.emplace_back(
+        Item(sum_mem_consumed, pool_name, getTUniqueId(*it), nullptr));
+    // Advance 'it' to possibly start a new span
+    it = next;
+  }
+  // Sort the list in descending order of memory consumed and queryId.
+  sort(listOfAggregatedItems.begin(), listOfAggregatedItems.end(),
+      std::greater<Item>());
+  // Decide the number of topN items to report from the list
+  int items = (listOfAggregatedItems.size() >= 5) ?
+      5 :
+      listOfAggregatedItems.size();
+  // Keep first 'items' items and remove the rest.
+  listOfAggregatedItems.resize(items);
+  // Now we are ready to report the stats.
+  // Prepare an index object that indicates the reporting for all elements.
+  std::vector<int> indices;
+  for (int i=0; i<items; i++) indices.emplace_back(i);
+  int indent = 0;
+  stringstream ss;
+  // Report the title.
+  OutputIndentedString(ss, indent, "", true);
+  OutputIndentedString(
+      ss, indent, std::string("Aggregated stats for pool ") + pool_name + ":");
+  // Report the topN aggregated queries.
+  indent += 3;
+  ReportTopNQueriesAtIndices(
+      ss, listOfAggregatedItems, indices, indent, total_mem_consumed);
+  return ss.str();
+}
+
+// Report the topN queries section in a string and append it to 'ss'.
+void AdmissionController::ReportTopNQueriesAtIndices(stringstream& ss,
+    std::vector<Item>& listOfTopNs, std::vector<int>& indices, int indent,
+    int64_t total_mem_consumed) const {
+  OutputIndentedString(ss, indent, "topN_query_stats: ");
+  indent += 3;
+  OutputIndentedString(ss, indent, "queries=[");
+  int items = indices.size();
+  int64_t total_mem_consumed_by_top_queries = 0;
+  indent += 3;
+  for (int i = 0; i < items; i++) {
+    // Fields in item: memory_consumed, name, queryId, &TPoolStats
+    const Item& item = listOfTopNs[indices[i]];
+    total_mem_consumed_by_top_queries += getMemConsumed(item);
+    // Print queryId.
+    OutputIndentedString(ss, indent, "id=", false);
+    ss << PrintId(getTUniqueId(item));
+    // Print mem consumed.
+    ss << ", consumed=" << PrintBytes(getMemConsumed(item));
+    if (i < items - 1) ss << ", ";
+    ss << std::endl;
+  }
+  indent -= 3;
+  OutputIndentedString(ss, indent, "],");
+  OutputIndentedString(ss, indent,
+      std::string("total_consumed=")
+          + PrintBytes(total_mem_consumed_by_top_queries));
+
+  // Lastly report the percentage of the total.
+  if ( total_mem_consumed > 0 ) {
+    stringstream local_ss;
+    local_ss << setprecision(2)
+             << (float)(total_mem_consumed_by_top_queries)
+            / total_mem_consumed;
+    OutputIndentedString(
+        ss, indent, std::string("fraction_of_pool_total_mem=") + local_ss.str());
+  }
+}
+
+// Append a new string to 'ss' describing queries running in a pool on
+// a host:
+//  1. The pool name;
+//  2. The top-N queries with most memory consumptions among these queries;
+//  3. Statistics about all queries
+void AdmissionController::AppendHeavyMemoryQueriesForAPoolInHostAtIndices(
+    stringstream& ss, std::vector<Item>& listOfTopNs, std::vector<int>& indices,
+    int indent) const {
+  DCHECK_GT(indices.size(), 0);
+  const Item& first_item = listOfTopNs[indices[0]];
+  const string& pool_name = getName(first_item);
+  // Report the pool name.
+  OutputIndentedString(ss, indent, std::string("pool_name=") + pool_name + ": ");
+  // Report topN queries.
+  indent += 3;
+  const TPoolStats* tpool_stats = getTPoolStats(first_item);
+  int64_t total_mem_consumed = getTPoolStats(first_item)->total_memory_consumed;
+  ReportTopNQueriesAtIndices(
+      ss, listOfTopNs, indices, indent, total_mem_consumed);
+
+  // Report stats about all queries
+  OutputIndentedString(ss, indent, "all_query_stats: ");
+  indent += 3;
+  OutputIndentedString(ss, indent, "num_running=", false);
+  ss << tpool_stats->num_running << ", " << std::endl;
+
+  OutputIndentedString(ss, indent, "min=", false);
+  ss << PrintBytes(tpool_stats->min_memory_consumed) << ", " << std::endl;
+
+  OutputIndentedString(ss, indent, "max=", false);
+  ss << PrintBytes(tpool_stats->max_memory_consumed) << ", " << std::endl;
+
+  OutputIndentedString(ss, indent, "pool_total_mem=", false);
+  ss << PrintBytes(total_mem_consumed) << ", " << std::endl;
+  if (tpool_stats->num_running > 0) {
+    OutputIndentedString(ss, indent, "average=", false);
+    ss << PrintBytes(total_mem_consumed / tpool_stats->num_running)
+       << std::endl;
+  }
+}
+
 // TODO: do we need host_id_ to come from host_addr or can it just take the same id
 // the Scheduler has (coming from the StatestoreSubscriber)?
 AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membership_mgr,
@@ -483,7 +801,8 @@ bool AdmissionController::CanAccommodateMaxInitialReservation(const ScheduleStat
 }
 
 bool AdmissionController::HasAvailableMemResources(const ScheduleState& state,
-    const TPoolConfig& pool_cfg, string* mem_unavailable_reason) {
+    const TPoolConfig& pool_cfg, string* mem_unavailable_reason,
+    string* not_admitted_details) {
   const string& pool_name = state.request_pool();
   const int64_t pool_max_mem = GetMaxMemForPool(pool_cfg);
   // If the pool doesn't have memory resources configured, always true.
@@ -508,6 +827,11 @@ bool AdmissionController::HasAvailableMemResources(const ScheduleState& state,
         PrintBytes(pool_max_mem), PrintBytes(cluster_mem_to_admit),
         PrintBytes(max(pool_max_mem - stats->EffectiveMemReserved(), 0L)),
         GetStalenessDetailLocked(" "));
+    // Find info about the top-N queries with most memory consumption from both
+    // local and remote stats in this pool.
+    if ( not_admitted_details ) {
+      *not_admitted_details = GetLogStringForTopNQueriesInPool(pool_name);
+    }
     return false;
   }
 
@@ -531,6 +855,11 @@ bool AdmissionController::HasAvailableMemResources(const ScheduleState& state,
           Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
               PrintBytes(max(admit_mem_limit - effective_host_mem_reserved, 0L)),
               PrintBytes(admit_mem_limit), GetStalenessDetailLocked(" "));
+      // Find info about the top-N queries with most memory consumption from all
+      // pools at this host.
+      if ( not_admitted_details ) {
+        *not_admitted_details = GetLogStringForTopNQueriesOnHost(host_id);
+      }
       return false;
     }
   }
@@ -565,7 +894,8 @@ bool AdmissionController::HasAvailableSlots(
 }
 
 bool AdmissionController::CanAdmitRequest(const ScheduleState& state,
-    const TPoolConfig& pool_cfg, bool admit_from_queue, string* not_admitted_reason) {
+    const TPoolConfig& pool_cfg, bool admit_from_queue,
+    string* not_admitted_reason, string* not_admitted_details) {
   // Can't admit if:
   //  (a) There are already queued requests (and this is not admitting from the queue).
   //  (b) The resource pool is already at the maximum number of requests.
@@ -595,7 +925,8 @@ bool AdmissionController::CanAdmitRequest(const ScheduleState& state,
     // TODO(IMPALA-8757): Extend slot based admission to default executor group
     return false;
   }
-  if (!HasAvailableMemResources(state, pool_cfg, not_admitted_reason)) {
+  if (!HasAvailableMemResources(
+          state, pool_cfg, not_admitted_reason, not_admitted_details)) {
     return false;
   }
   return true;
@@ -861,6 +1192,10 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
     RequestQueue* queue = &request_queue_map_[pool_name];
     VLOG_QUERY << "Queuing, query id=" << PrintId(request.query_id)
                << " reason: " << queue_node.not_admitted_reason;
+    if (queue_node.not_admitted_details.size() > 0) {
+      VLOG_RPC << "Top mem consuming queries: "
+               << queue_node.not_admitted_details;
+    }
     initial_queue_reason = queue_node.not_admitted_reason;
     stats->Queue();
     queue->Enqueue(&queue_node);
@@ -914,8 +1249,9 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
       pool_stats->Dequeue(true);
       request.summary_profile->AddInfoString(
           PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_TIME_OUT);
-      const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT,
-          queue_wait_timeout_ms, pool_name, queue_node.not_admitted_reason);
+      const ErrorMsg& rejected_msg =
+          ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT, queue_wait_timeout_ms, pool_name,
+              queue_node.not_admitted_reason, queue_node.not_admitted_details);
       VLOG_QUERY << rejected_msg.msg();
       return Status::Expected(rejected_msg);
     } else if (outcome == AdmissionOutcome::CANCELLED) {
@@ -1280,13 +1616,14 @@ bool AdmissionController::FindGroupToAdmitOrReject(
       return false;
     }
 
-    if (CanAdmitRequest(
-            *state, pool_config, admit_from_queue, &queue_node->not_admitted_reason)) {
+    if (CanAdmitRequest(*state, pool_config, admit_from_queue,
+            &queue_node->not_admitted_reason, &queue_node->not_admitted_details)) {
       queue_node->admitted_schedule = std::move(group_state.state);
       return true;
     } else {
       VLOG_RPC << "Cannot admit query " << queue_node->admission_request.query_id
-               << " to group " << group_name << ": " << queue_node->not_admitted_reason;
+               << " to group " << group_name << ": " << queue_node->not_admitted_reason
+               << " Details:" << queue_node->not_admitted_details;
     }
   }
   return true;
@@ -1298,6 +1635,13 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   MemTracker* tracker =
       ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false);
 
+  if (tracker) {
+    // Update local_stats_ with the query Ids of the top 5 queries, plus the min, the max,
+    // the total memory consumption, and the number of all queries running on this
+    // host tracked by this pool.
+    tracker->UpdatePoolStatsForQueries(5 /*limit*/, this->local_stats_);
+  }
+
   const int64_t current_reserved =
       tracker == nullptr ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();
   if (current_reserved != local_stats_.backend_mem_reserved) {
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 4e56317..4de33fd 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -534,11 +534,23 @@ class AdmissionController {
     /// aggregates when called over all pools.
     void UpdateAggregates(HostMemMap* host_mem_reserved);
 
-    const TPoolStats& local_stats() { return local_stats_; }
+    const TPoolStats& local_stats() const { return local_stats_; }
+
+    // A map from the id of a host to the TPoolStats about that host.
+    typedef boost::unordered_map<std::string, TPoolStats> RemoteStatsMap;
+    const RemoteStatsMap& remote_stats() const { return  remote_stats_; }
+
+    /// Return the TPoolStats for a remote host in remote_stats_ if it can be found.
+    /// Return nullptr otherwise.
+    TPoolStats* FindTPoolStatsForRemoteHost(const string& host_id) {
+      RemoteStatsMap::iterator it = remote_stats_.find(host_id);
+      return (it != remote_stats_.end()) ? &(it->second) : nullptr;
+    }
 
     /// Updates the metrics exposing the pool configuration to those in pool_cfg.
     void UpdateConfigMetrics(const TPoolConfig& pool_cfg);
 
+    const PoolMetrics* metrics() const { return &metrics_; }
     PoolMetrics* metrics() { return &metrics_; }
     std::string DebugString() const;
 
@@ -589,7 +601,6 @@ class AdmissionController {
 
     /// Map of host_ids to the latest TPoolStats. Entirely generated by incoming
     /// statestore updates; updated by UpdateRemoteStats() and used by UpdateAggregates().
-    typedef boost::unordered_map<std::string, TPoolStats> RemoteStatsMap;
     RemoteStatsMap remote_stats_;
 
     /// Per-pool metrics, created by InitMetrics().
@@ -611,15 +622,38 @@ class AdmissionController {
 
     void InitMetrics();
 
+    // Return a string about the content of a TPoolStats object.
+    std::string DebugPoolStats(const TPoolStats& stats) const;
+
+    // Append a string about the memory consumption part of a TPoolStats object to 'ss'.
+    static void AppendStatsForConsumedMemory(
+      std::stringstream& ss, const TPoolStats& stats);
+
     FRIEND_TEST(AdmissionControllerTest, Simple);
     FRIEND_TEST(AdmissionControllerTest, PoolStats);
     FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
     FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
     FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
     FRIEND_TEST(AdmissionControllerTest, QueryRejection);
+    FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
     friend class AdmissionControllerTest;
   };
 
+ private:
+  // Return a string reporting top 5 queries with most memory consumed among all
+  // pools in a host. The string is composed of up to 5 sections, where
+  // each section is about one pool describing the following: the top queries
+  // and stats about all queries in the pool. The sum of all top queries
+  // in these secions is at most 5.
+  std::string GetLogStringForTopNQueriesOnHost(const std::string& host_id);
+
+  // Return a string reporting top 5 queries with most memory consumed among all
+  // hosts in the pool. The string is composed of up to 5 sections, where
+  // each is about one host describing the following: the top queries and
+  // and stats about them in the host. The sum of all top queries
+  // in these secions is at most 5.
+  std::string GetLogStringForTopNQueriesInPool(const std::string& pool_name);
+
   /// Map of pool names to pool stats. Accessed via GetPoolStats().
   /// Protected by admission_ctrl_lock_.
   typedef boost::unordered_map<std::string, PoolStats> PoolStatsMap;
@@ -658,6 +692,7 @@ class AdmissionController {
         RuntimeProfile* profile)
       : admission_request(std::move(request)),
         profile(profile),
+        not_admitted_details("Not Applicable"),
         admit_outcome(admission_outcome) {}
 
     /////////////////////////////////////////
@@ -693,6 +728,13 @@ class AdmissionController {
     /// The last reason why this request could not be admitted.
     std::string not_admitted_reason;
 
+    /// Details of memory consumptions populated in HasAvailableMemResources()
+    /// when pool or host memory consumptions exceed the limit. With pool memory
+    /// exhaustion, the details contain a list of queries with most memory consumption
+    /// across all hosts. With host memory exhaustion, it contains a list of queries
+    /// with most memory consumption and aggregated stats across all pools in that host.
+    std::string not_admitted_details;
+
     /// The Admission outcome of the queued request.
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;
 
@@ -821,9 +863,13 @@ class AdmissionController {
   /// Returns true if schedule can be admitted to the pool with pool_cfg.
   /// admit_from_queue is true if attempting to admit from the queue. Otherwise, returns
   /// false and not_admitted_reason specifies why the request can not be admitted
-  /// immediately. Caller owns not_admitted_reason. Must hold admission_ctrl_lock_.
+  /// immediately. When provided, not_admitted_details specifies the details of memory
+  /// consumptions including top 5 queries utilizing the memory most when there are not
+  /// enough memory resources available for the query. Caller owns not_admitted_reason and
+  /// not_admitted_details. Must hold admission_ctrl_lock_.
   bool CanAdmitRequest(const ScheduleState& state, const TPoolConfig& pool_cfg,
-      bool admit_from_queue, std::string* not_admitted_reason);
+      bool admit_from_queue, std::string* not_admitted_reason,
+      std::string* not_admitted_details = nullptr);
 
   /// Returns true if all executors can accommodate the largest initial reservation of
   /// any executor and the backend running the coordinator fragment can accommodate its
@@ -847,7 +893,7 @@ class AdmissionController {
   /// 'mem_unavailable_reason'.
   /// Must hold admission_ctrl_lock_.
   bool HasAvailableMemResources(const ScheduleState& state, const TPoolConfig& pool_cfg,
-      std::string* mem_unavailable_reason);
+      std::string* mem_unavailable_reason, std::string* topN_queries = nullptr);
 
   /// Returns true if there are enough available slots on all executors in the schedule to
   /// fit the query schedule. The number of slots per executors does not change with the
@@ -1004,6 +1050,39 @@ class AdmissionController {
   /// admitted or released.
   void UpdateExecGroupMetric(const string& grp_name, int64_t delta);
 
+  /// A helper type to glue information together to compute the topN queries out of <n>
+  /// topM queries through a priority queue. Each object of the type represents a query.
+  ///
+  /// Each field in the type is defined as follows.
+  ///   field 0: memory consumed;
+  ///   field 1: the name of a pool or a host;
+  ///   field 2: query id;
+  ///   field 3: a pointer to TPoolStats.
+
+  typedef std::tuple<int64_t, string, TUniqueId, const TPoolStats*> Item;
+  /// Get the memory consumed.
+  const int64_t& getMemConsumed(const Item& item) const { return std::get<0>(item); }
+  /// Get either the pool or host name.
+  const string& getName(const Item& item) const { return std::get<1>(item); }
+  /// Get the query Id.
+  const TUniqueId& getTUniqueId(const Item& item) const { return std::get<2>(item); }
+  /// Get the pointer to TPoolStats.
+  const TPoolStats* getTPoolStats(const Item& item) const { return std::get<3>(item); }
+
+  // Append a new string to 'ss' describing queries running in a pool on a host.
+  // These queries are a subset of items in listOfTopNs whose indices are in 'indices'.
+  // 'indent' specifies the number of spaces prefixing all lines in the resultant string.
+  void AppendHeavyMemoryQueriesForAPoolInHostAtIndices(std::stringstream& ss,
+      std::vector<Item>& listOfTopNs, std::vector<int>& indices, int indent) const;
+
+  // Report the topN queries in a string and append it to 'ss'. These queries are
+  // a subset of items in listOfTopNs whose indices are in 'indices'. One query Id
+  // together its mem consumed is reported per line. 'indent' provides the initial
+  // value of indentation. 'total_mem_consumed' contains the total memory consumed,
+  // from which a fraction of mem consumed by the topN queries can be reported.
+  void ReportTopNQueriesAtIndices(std::stringstream& ss, std::vector<Item>& listOfTopNs,
+      std::vector<int>& indices, int indent, int64_t total_mem_consumed) const;
+
   FRIEND_TEST(AdmissionControllerTest, Simple);
   FRIEND_TEST(AdmissionControllerTest, PoolStats);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
@@ -1013,6 +1092,7 @@ class AdmissionController {
   FRIEND_TEST(AdmissionControllerTest, QueryRejection);
   FRIEND_TEST(AdmissionControllerTest, DedicatedCoordScheduleState);
   FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
+  FRIEND_TEST(AdmissionControllerTest, TopNQueryCheck);
   friend class AdmissionControllerTest;
 };
 
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index 4013ea9..c8900a4 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -30,6 +30,7 @@
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Status_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/StatestoreService_types.h"
 #include "gen-cpp/common.pb.h"
 
 /// Comparators for types that we commonly use in containers.
@@ -116,6 +117,14 @@ inline bool operator==(const TCounter& lhs, const TCounter& rhs) {
       == std::tie(rhs.name, rhs.unit, rhs.value);
 }
 
+// THeavyMemoryQuery
+STATIC_ASSERT_SIZE(THeavyMemoryQuery, 40);
+
+inline bool operator>(const THeavyMemoryQuery& lhs, const THeavyMemoryQuery& rhs) {
+  return std::tie(lhs.memory_consumed, lhs.queryId)
+      > std::tie(rhs.memory_consumed, rhs.queryId);
+}
+
 /// Hash function for TNetworkAddress. This function must be called hash_value to be picked
 /// up properly by boost.
 inline std::size_t hash_value(const TNetworkAddress& host_port) {
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 041b0df..41dedb4 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -25,6 +25,17 @@ enum StatestoreServiceVersion {
    V1 = 0
 }
 
+// Description of a single entry in a list of heavy memory usage queries
+struct THeavyMemoryQuery {
+
+  // The memory consumption of the query
+  1: required i64 memory_consumed;
+
+  // The Id of the query
+  2: required Types.TUniqueId queryId;
+}
+
+
 // Structure serialized for the topic AdmissionController::IMPALA_REQUEST_QUEUE_TOPIC.
 // Statistics for a single admission control pool. The topic key is of the form
 // "<pool_name>!<backend_id>".
@@ -42,6 +53,25 @@ struct TPoolStats {
   // execution on this impalad, this value increases by 10G. Any other impalads executing
   // this query will also increment their backend_mem_reserved by 10G.
   3: required i64 backend_mem_reserved;
+
+  // More entries about queries running in the backend.
+  //
+  // List of queries with top-k memory consumptions.
+  4: required list<THeavyMemoryQuery> heavy_memory_queries;
+
+  // Min memory consumption among all queries.
+  5: required i64 min_memory_consumed;
+
+  // Max memory consumption among all queries.
+  6: required i64 max_memory_consumed;
+
+  // Total memory consumption among all queries.
+  7: required i64 total_memory_consumed;
+
+  // The number of queries that have live fragments taking up memory on the host in
+  // a pool. These queries must be tracked by some query mem trackers. In comparison,
+  // num_admitted_running tracks the number of queries admitted in a host.
+  8: required i64 num_running;
 }
 
 // Description of a single entry in a topic
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 390b2ce..f0f9505 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -334,7 +334,7 @@ error_codes = (
   ("ADMISSION_REJECTED", 107, "Rejected query from pool $0: $1"),
 
   ("ADMISSION_TIMED_OUT", 108, "Admission for query exceeded timeout $0ms in pool $1. "
-     "Queued reason: $2"),
+          "Queued reason: $2 Additional Details: $3"),
 
   ("THREAD_CREATION_FAILED", 109, "Failed to create thread $0 in category $1: $2"),
 
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index a84862b..39e690c 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -887,6 +887,65 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
+        pool_max_mem=10 * 1024 * 1024, proc_mem_limit=2 * 1024 * 1024,
+        queue_wait_timeout_ms=1000),
+      statestored_args=_STATESTORED_ARGS)
+  def test_timeout_reason_host_memory(self):
+    """Test that queue details appear in the profile when queued and then timed out
+    due to a small 2MB host memory limit configuration."""
+    # Run a bunch of queries with mem_limit set so that only one can be admitted
+    # immediately. The rest should be queued and dequeued (timeout) due to host memory
+    # pressure.
+    STMT = "select sleep(100)"
+    TIMEOUT_S = 20
+    NUM_QUERIES = 5
+    profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
+        TIMEOUT_S, {'mem_limit': '2mb'}, True)
+
+    EXPECTED_REASON = """.*Admission for query exceeded timeout 1000ms in pool """\
+             """default-pool.*"""\
+             """Not enough memory available on host.*"""\
+             """Stats for host.*"""\
+             """topN_query_stats.*"""\
+             """all_query_stats:.*"""
+    num_reasons = len([profile for profile in profiles
+         if re.search(EXPECTED_REASON, profile, re.DOTALL)])
+    assert num_reasons >= 1, \
+        "At least one query should have been timed out with topN query details: " +\
+        '\n===\n'.join(profiles)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
+        pool_max_mem=2 * 1024 * 1024, proc_mem_limit=20 * 1024 * 1024,
+        queue_wait_timeout_ms=1000),
+      statestored_args=_STATESTORED_ARGS)
+  def test_timeout_reason_pool_memory(self):
+    """Test that queue details appear in the profile when queued and then timed out
+    due to a small 2MB pool memory limit configuration."""
+    # Run a bunch of queries with mem_limit set so that only one can be admitted
+    # immediately. The rest should be queued and dequeued (timeout) due to pool memory
+    # pressure.
+    STMT = "select sleep(100)"
+    TIMEOUT_S = 20
+    NUM_QUERIES = 5
+    profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
+        TIMEOUT_S, {'mem_limit': '2mb'}, True)
+
+    EXPECTED_REASON = """.*Admission for query exceeded timeout 1000ms in pool """\
+            """default-pool.*"""\
+            """Not enough aggregate memory available in pool default-pool.*"""\
+            """Aggregated stats for pool.*"""\
+            """topN_query_stats.*"""
+    num_reasons = len([profile for profile in profiles
+         if re.search(EXPECTED_REASON, profile, re.DOTALL)])
+    assert num_reasons >= 1, \
+        "At least one query should have been timed out with topN query details: " +\
+        '\n===\n'.join(profiles)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=100, max_queued=10,
           pool_max_mem=-1, admission_control_slots=4,
           executor_groups="default-pool-group1"),