You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/21 19:39:55 UTC

[impala] 07/13: IMPALA-6969: add AC last queued reason to profile

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bce16750fe763d1a8b50d4e3ffc32718eacea1f2
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Jun 14 18:10:48 2018 -0700

    IMPALA-6969: add AC last queued reason to profile
    
    The reason is updated during initial admission and when the query is at
    the head of the queue but can't be admitted. It is not updated while
    the query is in the middle of the queue.
    
    Together with the async admission change, this makes it possible to
    determine from the profile why the query has not been admitted yet.
    
    Testing:
    Added admission control tests that check that the
    string is set for queries queued based both on the
    query count and the max memory.
    
    Looped the tests overnight to confirm non-flakiness.
    
    Change-Id: Ida9b75dc50dfb7a27f59deda91bad6ac838130a1
    Reviewed-on: http://gerrit.cloudera.org:8080/10731
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc         |  47 ++++++----
 be/src/scheduling/admission-controller.h          |  14 ++-
 tests/custom_cluster/test_admission_controller.py | 105 ++++++++++++++++++++++
 3 files changed, 148 insertions(+), 18 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 9a8d2ba..48b3910 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -107,8 +107,9 @@ const string PROFILE_INFO_VAL_CANCELLED_IN_QUEUE= "Cancelled (queued)";
 const string PROFILE_INFO_VAL_ADMIT_QUEUED = "Admitted (queued)";
 const string PROFILE_INFO_VAL_REJECTED = "Rejected";
 const string PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)";
-const string PROFILE_INFO_KEY_QUEUE_DETAIL = "Admission queue details";
-const string PROFILE_INFO_VAL_QUEUE_DETAIL = "waited $0 ms, reason: $1";
+const string PROFILE_INFO_KEY_INITIAL_QUEUE_REASON = "Initial admission queue reason";
+const string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON = "waited $0 ms, reason: $1";
+const string PROFILE_INFO_KEY_LAST_QUEUED_REASON = "Latest admission queue reason";
 
 // Error status string details
 const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
@@ -147,7 +148,7 @@ const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
 
 // Queue decision details
 // $0 = num running queries, $1 = num queries limit
-const string QUEUED_NUM_RUNNING = "number of running queries $0 is over limit $1";
+const string QUEUED_NUM_RUNNING = "number of running queries $0 is at or over limit $1";
 // $0 = queue size
 const string QUEUED_QUEUE_NOT_EMPTY = "queue is not empty (size $0); queued queries are "
     "executed first";
@@ -536,7 +537,7 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
   const int64_t max_mem = pool_cfg.max_mem_resources;
 
   // Note the queue_node will not exist in the queue when this method returns.
-  QueueNode queue_node(*schedule, admit_outcome);
+  QueueNode queue_node(*schedule, admit_outcome, schedule->summary_profile());
   string not_admitted_reason;
 
   schedule->query_events()->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
@@ -603,10 +604,12 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
 
   // Update the profile info before waiting. These properties will be updated with
   // their final state after being dequeued.
-  schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-      PROFILE_INFO_VAL_QUEUED);
-  schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_QUEUE_DETAIL,
-      not_admitted_reason);
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_QUEUED);
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, not_admitted_reason);
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_LAST_QUEUED_REASON, not_admitted_reason);
   schedule->query_events()->MarkEvent(QUERY_EVENT_QUEUED);
 
   int64_t queue_wait_timeout_ms = FLAGS_queue_wait_timeout_ms;
@@ -621,8 +624,9 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule,
   bool timed_out;
   admit_outcome->Get(queue_wait_timeout_ms, &timed_out);
   int64_t wait_time_ms = MonotonicMillis() - wait_start_ms;
-  schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_QUEUE_DETAIL,
-      Substitute(PROFILE_INFO_VAL_QUEUE_DETAIL, wait_time_ms, not_admitted_reason));
+  schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON,
+      Substitute(
+          PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms, not_admitted_reason));
 
   SleepIfSetInDebugOptions(schedule->query_options(), SLEEP_AFTER_ADMISSION_OUTCOME_MS);
 
@@ -888,6 +892,7 @@ void AdmissionController::DequeueLoop() {
       PoolStatsMap::iterator it = pool_stats_.find(pool_name);
       DCHECK(it != pool_stats_.end());
       PoolStats* stats = &it->second;
+      RequestQueue& queue = request_queue_map_[pool_name];
 
       if (stats->local_stats().num_queued == 0) continue; // Nothing to dequeue
 
@@ -906,7 +911,13 @@ void AdmissionController::DequeueLoop() {
       int64_t max_to_dequeue = 0;
       if (max_requests > 0) {
         const int64_t total_available = max_requests - stats->agg_num_running();
-        if (total_available <= 0) continue;
+        if (total_available <= 0) {
+          if (!queue.empty()) {
+            LogDequeueFailed(queue.head(),
+                Substitute(QUEUED_NUM_RUNNING, stats->agg_num_running(), max_requests));
+          }
+          continue;
+        }
         // Use the ratio of locally queued requests to agg queued so that each impalad
         // can dequeue a proportional amount total_available. Note, this offers no
         // fairness between impalads.
@@ -920,8 +931,6 @@ void AdmissionController::DequeueLoop() {
       } else {
         max_to_dequeue = stats->agg_num_queued(); // No limit on num running requests
       }
-
-      RequestQueue& queue = request_queue_map_[pool_name];
       VLOG_RPC << "Dequeue thread will try to admit " << max_to_dequeue << " requests"
                << ", pool=" << pool_name << ", num_queued="
                << stats->local_stats().num_queued;
@@ -937,8 +946,7 @@ void AdmissionController::DequeueLoop() {
         // better policy once we have better test scenarios.
         if (!is_cancelled
             && !CanAdmitRequest(schedule, pool_config, true, &not_admitted_reason)) {
-          VLOG_QUERY << "Could not dequeue query id=" << PrintId(schedule.query_id())
-                     << " reason: " << not_admitted_reason;
+          LogDequeueFailed(queue_node, not_admitted_reason);
           break;
         }
         VLOG_RPC << "Dequeuing query=" << PrintId(schedule.query_id());
@@ -961,6 +969,15 @@ void AdmissionController::DequeueLoop() {
   }
 }
 
+void AdmissionController::LogDequeueFailed(QueueNode* node,
+    const string& not_admitted_reason) {
+  VLOG_QUERY << "Could not dequeue query id="
+             << PrintId(node->schedule.query_id())
+             << " reason: " << not_admitted_reason;
+  node->profile->AddInfoString(PROFILE_INFO_KEY_LAST_QUEUED_REASON,
+      not_admitted_reason);
+}
+
 AdmissionController::PoolStats*
 AdmissionController::GetPoolStats(const string& pool_name) {
   PoolStatsMap::iterator it = pool_stats_.find(pool_name);
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index b00c650..e5ffd99 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -391,14 +391,18 @@ class AdmissionController {
   /// ClientRequestState object associated with them.
   struct QueueNode : public InternalQueue<QueueNode>::Node {
     QueueNode(const QuerySchedule& query_schedule,
-        Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome)
-      : schedule(query_schedule), admit_outcome(admission_outcome) {}
+        Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome,
+        RuntimeProfile* profile)
+      : schedule(query_schedule), admit_outcome(admission_outcome), profile(profile) {}
 
     /// The query schedule of the queued request.
     const QuerySchedule& schedule;
 
     /// The Admission outcome of the queued request.
-    Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome;
+    Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;
+
+    /// Profile to be updated with information about admission.
+    RuntimeProfile* const profile;
   };
 
   /// Queue for the queries waiting to be admitted for execution. Once the
@@ -477,6 +481,10 @@ class AdmissionController {
 
   /// Gets or creates the PoolStats for pool_name. Must hold admission_ctrl_lock_.
   PoolStats* GetPoolStats(const std::string& pool_name);
+
+  /// Log the reason for dequeueing of 'node' failing and add the reason to the query's
+  /// profile. Must hold admission_ctrl_lock_.
+  void LogDequeueFailed(QueueNode* node, const std::string& not_admitted_reason);
 };
 
 }
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 209991b..57d0dc2 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -143,6 +143,11 @@ QUERY_END_BEHAVIORS = ['EOS', 'CLIENT_CANCEL', 'QUERY_TIMEOUT', 'CLIENT_CLOSE']
 # The timeout used for the QUERY_TIMEOUT end behaviour
 QUERY_END_TIMEOUT_S = 1
 
+# Regex that matches the first part of the profile info string added when a query is
+# queued.
+INITIAL_QUEUE_REASON_REGEX = \
+    "Initial admission queue reason: waited [0-9]* ms, reason: .*"
+
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
     proc_mem_limit = None, queue_wait_timeout_ms=None):
   extra_flags = ""
@@ -248,6 +253,35 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     HS2TestSuite.check_response(get_profile_resp)
     self.__check_query_options(get_profile_resp.profile, expected_options)
 
+  def _execute_and_collect_profiles(self, queries, timeout_s, config_options={}):
+    """Submit the query statements in 'queries' in parallel to the first impalad in
+    the cluster. After submission, the results are fetched from the queries in
+    sequence and their profiles are collected. Wait for up to timeout_s for
+    each query to finish. Returns the profile strings."""
+    client = self.cluster.impalads[0].service.create_beeswax_client()
+    try:
+      handles = []
+      profiles = []
+      client.set_configuration(config_options)
+      for query in queries:
+        handles.append(client.execute_async(query))
+      for query, handle in zip(queries, handles):
+        self._wait_for_state(client, handle, client.QUERY_STATES['FINISHED'], timeout_s)
+        results = self.client.fetch(query, handle)
+        profiles.append(self.client.get_runtime_profile(handle))
+      return profiles
+    finally:
+      client.close()
+
+  def _wait_for_state(self, client, handle, expected_state, timeout):
+    """Try to fetch 'expected_state' from 'client' within 'timeout' seconds.
+    Fail if unable."""
+    start_time = time()
+    actual_state = client.get_state(handle)
+    while (actual_state != expected_state and time() - start_time < timeout):
+      actual_state = client.get_state(handle)
+    assert expected_state == actual_state
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_config_args(\
@@ -558,6 +592,77 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     finally:
       client.close()
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
+          pool_max_mem=1024 * 1024 * 1024),
+      statestored_args=_STATESTORED_ARGS)
+  def test_queue_reasons_num_queries(self):
+    """Test that queue details appear in the profile when queued based on num_queries."""
+    # Run a bunch of queries - one should get admitted immediately, the rest should
+    # be dequeued one-by-one.
+    STMT = "select sleep(100)"
+    TIMEOUT_S = 60
+    EXPECTED_REASON = \
+        "Latest admission queue reason: number of running queries 1 is at or over limit 1"
+    NUM_QUERIES = 5
+    profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
+        TIMEOUT_S)
+
+    num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
+    assert num_reasons == NUM_QUERIES - 1, \
+        "All queries except first should have been queued: " + '\n===\n'.join(profiles)
+    init_queue_reasons = self.__extract_init_queue_reasons(profiles)
+    assert len(init_queue_reasons) == NUM_QUERIES - 1, \
+        "All queries except first should have been queued: " + '\n===\n'.join(profiles)
+    over_limit_details = [detail
+        for detail in init_queue_reasons if 'number of running queries' in detail]
+    assert len(over_limit_details) == 1, \
+        "One query initially queued because of num_queries: " + '\n===\n'.join(profiles)
+    queue_not_empty_details = [detail
+        for detail in init_queue_reasons if 'queue is not empty' in detail]
+    assert len(queue_not_empty_details) == NUM_QUERIES - 2, \
+        "Others queued because of non-empty queue: " + '\n===\n'.join(profiles)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=10, max_queued=10,
+          pool_max_mem=10 * 1024 * 1024),
+      statestored_args=_STATESTORED_ARGS)
+  def test_queue_reasons_memory(self):
+    """Test that queue details appear in the profile when queued based on memory."""
+    # Run a bunch of queries with mem_limit set so that only one can be admitted at a
+    # time- one should get admitted immediately, the rest should be dequeued one-by-one.
+    STMT = "select sleep(100)"
+    TIMEOUT_S = 60
+    EXPECTED_REASON = "Latest admission queue reason: Not enough aggregate memory " +\
+        "available in pool default-pool with max mem resources 10.00 MB. Needed " +\
+        "9.00 MB but only 1.00 MB was available."
+    NUM_QUERIES = 5
+    profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
+        TIMEOUT_S, {'mem_limit':'9mb'})
+
+    num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
+    assert num_reasons == NUM_QUERIES - 1, \
+        "All queries except first should have been queued: " + '\n===\n'.join(profiles)
+    init_queue_reasons = self.__extract_init_queue_reasons(profiles)
+    assert len(init_queue_reasons) == NUM_QUERIES - 1, \
+        "All queries except first should have been queued: " + '\n===\n'.join(profiles)
+    over_limit_details = [detail for detail in init_queue_reasons
+        if 'Not enough aggregate memory available' in detail]
+    assert len(over_limit_details) == 1, \
+        "One query initially queued because of memory: " + '\n===\n'.join(profiles)
+    queue_not_empty_details = [detail
+        for detail in init_queue_reasons if 'queue is not empty' in detail]
+    assert len(queue_not_empty_details) == NUM_QUERIES - 2, \
+        "Others queued because of non-empty queue: " + '\n===\n'.join(profiles)
+
+  def __extract_init_queue_reasons(self, profiles):
+    """Return a list of the 'Admission Queue details' strings found in 'profiles'"""
+    matches = [re.search(INITIAL_QUEUE_REASON_REGEX, profile) for profile in profiles]
+    return [match.group(0) for match in matches if match is not None]
+
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin