You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/10/25 19:12:41 UTC

[impala] branch master updated: IMPALA-8995: Fix synchronization in dequeue thread

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e26b7f  IMPALA-8995: Fix synchronization in dequeue thread
9e26b7f is described below

commit 9e26b7f25abbe147bc19cf94b01607bb37663364
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Wed Oct 23 17:49:48 2019 -0700

    IMPALA-8995: Fix synchronization in dequeue thread
    
    The admission controller's dequeue thread currently wakes up either
    when queries release their admission resources or when a statestore
    update is received. The dequeue loop releases the admission lock at
    the end of the loop, then acquires it back and calls wait on it.
    In this small window, a query can complete, update the admission
    stats by acquiring the admission lock, and then call send a notify
    to wake the dequeue thread. But since the dequeue thread has not
    called wait yet, it can miss this notify. Moreover if the statestore
    is down there is no way of waking it up. This will cause the queued
    queries to eventually timeout. This patch attempts to fix this by
    removing that window.
    
    Testing:
    Was able to trigger this manually by adding a sleep right before
    the dequeue loop acquires the admission lock.
    
    Change-Id: I91080ce54e59cc7e6361f7c50d6b2156a8a180c8
    Reviewed-on: http://gerrit.cloudera.org:8080/14539
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc | 11 +++++++++--
 be/src/scheduling/admission-controller.h  |  4 ++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 2f335bc..798abb4 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -324,6 +324,7 @@ AdmissionController::~AdmissionController() {
     // Lock to ensure the dequeue thread will see the update to done_
     lock_guard<mutex> l(admission_ctrl_lock_);
     done_ = true;
+    pending_dequeue_ = true;
     dequeue_cv_.NotifyOne();
   }
   dequeue_thread_->Join();
@@ -992,6 +993,7 @@ void AdmissionController::ReleaseQuery(
     UpdateExecGroupMetric(schedule.executor_group(), -1);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
+    pending_dequeue_ = true;
   }
   dequeue_cv_.NotifyOne();
 }
@@ -1021,6 +1023,7 @@ void AdmissionController::ReleaseQueryBackends(
          << GetPoolStats(schedule)->DebugString();
       VLOG(2) << ss.str();
     }
+    pending_dequeue_ = true;
   }
   dequeue_cv_.NotifyOne();
 }
@@ -1055,6 +1058,7 @@ void AdmissionController::UpdatePoolStats(
     }
     UpdateClusterAggregates();
     last_topic_update_time_ms_ = MonotonicMillis();
+    pending_dequeue_ = true;
   }
   dequeue_cv_.NotifyOne(); // Dequeue and admit queries on the dequeue thread
 }
@@ -1369,10 +1373,13 @@ void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
 }
 
 void AdmissionController::DequeueLoop() {
+  unique_lock<mutex> lock(admission_ctrl_lock_);
   while (true) {
-    unique_lock<mutex> lock(admission_ctrl_lock_);
     if (done_) break;
-    dequeue_cv_.Wait(lock);
+    while (!pending_dequeue_) {
+      dequeue_cv_.Wait(lock);
+    }
+    pending_dequeue_ = false;
     ClusterMembershipMgr::SnapshotPtr membership_snapshot =
         cluster_membership_mgr_->GetSnapshot();
 
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index e106751..3b93d3f 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -732,6 +732,10 @@ class AdmissionController {
   typedef boost::unordered_map<std::string, TPoolConfig> PoolConfigMap;
   PoolConfigMap pool_config_map_;
 
+  /// Indicates whether a change in pool stats warrants an attempt by the dequeuing
+  /// thread to dequeue.
+  bool pending_dequeue_ = true;
+
   /// Notifies the dequeuing thread that pool stats have changed and it may be
   /// possible to dequeue and admit queries.
   ConditionVariable dequeue_cv_;