You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/06/17 05:15:58 UTC

[impala] branch master updated: IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new aa6d788  IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink
aa6d788 is described below

commit aa6d7887eec1efd33c73f77e5346a499569e5b6b
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Tue Jun 16 11:57:05 2020 -0700

    IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink
    
    In BufferedPlanRootSink::FlushFinal(), if Cancel() runs
    before FlushFinal() waits on the consumer_eos_ condition variable,
    the thread in FlushFinal() will wait forever. This is because it is
    not checking for cancellation or synchronizing with the Cancel()
    thread.
    
    Specifically:
    Thread A: Calls BufferedPlanRootSink::Cancel(), signalling any
      thread currently waiting on the consumer_eos_ condition variable.
    Thread B: Enters FlushFinal(). Never tests RuntimeState::is_cancelled()
      and calls Wait() on the consumer_eos_ condition variable. This waits
      forever.
    
    This changes BufferedPlanRootSink::Cancel() to get the lock_ before
    signalling the consumer_eos_ condition variable. It also changes
    FlushFinal() to call Wait() in a loop. It breaks out of the loop if
    it is cancelled or the batch_queue_ is empty. There are two cases:
    1. FlushFinal() gets the lock_ first and only releases it when waiting
    on the consumer_eos_ condition variable. It will get signalled by
    Cancel().
    2. Cancel() gets the lock_ first and FlushFinal() will not wait,
    because is_cancelled() is true.
    
    Testing:
     - Run core tests
    
    Change-Id: Id6f3fbc05420ca95313fa79ea106547feb92b16b
    Reviewed-on: http://gerrit.cloudera.org:8080/16088
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/buffered-plan-root-sink.cc | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 277eac6..6dd69dd 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -105,8 +105,9 @@ Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
   // If no batches are ever added, wake up the consumer thread so it can check the
   // SenderState and return appropriately.
   rows_available_.NotifyAll();
-  // Wait until the consumer has read all rows from the batch_queue_.
-  {
+  // Wait until the consumer has read all rows from the batch_queue_ or this has
+  // been cancelled.
+  while (!IsCancelledOrClosed(state) && !IsQueueEmpty(state)) {
     SCOPED_TIMER(profile()->inactive_timer());
     consumer_eos_.Wait(l);
   }
@@ -136,6 +137,14 @@ void BufferedPlanRootSink::Close(RuntimeState* state) {
 
 void BufferedPlanRootSink::Cancel(RuntimeState* state) {
   DCHECK(state->is_cancelled());
+  // Get the lock_ to synchronize with FlushFinal(). Either FlushFinal() will be waiting
+  // on the consumer_eos_ condition variable and get signalled below, or it will see
+  // that is_cancelled() is true after it gets the lock. Drop the the lock before
+  // signalling the CV so that a blocked thread can immediately acquire the mutex when
+  // it wakes up.
+  {
+    unique_lock<mutex> l(lock_);
+  }
   // Wake up all sleeping threads so they can check the cancellation state.
   // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
   // ensure that all sleeping threads are awoken. The calls to NotifyAll() are not on the