You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/06/17 05:15:58 UTC
[impala] branch master updated: IMPALA-9842: Fix hang when
cancelling BufferedPlanRootSink
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new aa6d788 IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink
aa6d788 is described below
commit aa6d7887eec1efd33c73f77e5346a499569e5b6b
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Tue Jun 16 11:57:05 2020 -0700
IMPALA-9842: Fix hang when cancelling BufferedPlanRootSink
In BufferedPlanRootSink::FlushFinal(), if Cancel() runs
before FlushFinal() waits on the consumer_eos_ condition variable,
the thread in FlushFinal() will wait forever. This is because it is
not checking for cancellation or synchronizing with the Cancel()
thread.
Specifically:
Thread A: Calls BufferedPlanRootSink::Cancel(), signalling any
thread currently waiting on the consumer_eos_ condition variable.
Thread B: Enters FlushFinal(). Never tests RuntimeState::is_cancelled()
and calls Wait() on the consumer_eos_ condition variable. This waits
forever.
This changes BufferedPlanRootSink::Cancel() to get the lock_ before
signalling the consumer_eos_ condition variable. It also changes
FlushFinal() to call Wait() in a loop. It breaks out of the loop if
it is cancelled or the batch_queue_ is empty. There are two cases:
1. FlushFinal() gets the lock_ first and only releases it when waiting
on the consumer_eos_ condition variable. It will get signalled by
Cancel().
2. Cancel() gets the lock_ first and FlushFinal() will not wait,
because is_cancelled() is true.
Testing:
- Run core tests
Change-Id: Id6f3fbc05420ca95313fa79ea106547feb92b16b
Reviewed-on: http://gerrit.cloudera.org:8080/16088
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/buffered-plan-root-sink.cc | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 277eac6..6dd69dd 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -105,8 +105,9 @@ Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
// If no batches are ever added, wake up the consumer thread so it can check the
// SenderState and return appropriately.
rows_available_.NotifyAll();
- // Wait until the consumer has read all rows from the batch_queue_.
- {
+ // Wait until the consumer has read all rows from the batch_queue_ or this has
+ // been cancelled.
+ while (!IsCancelledOrClosed(state) && !IsQueueEmpty(state)) {
SCOPED_TIMER(profile()->inactive_timer());
consumer_eos_.Wait(l);
}
@@ -136,6 +137,14 @@ void BufferedPlanRootSink::Close(RuntimeState* state) {
void BufferedPlanRootSink::Cancel(RuntimeState* state) {
DCHECK(state->is_cancelled());
+ // Get the lock_ to synchronize with FlushFinal(). Either FlushFinal() will be waiting
+ // on the consumer_eos_ condition variable and get signalled below, or it will see
+ // that is_cancelled() is true after it gets the lock. Drop the the lock before
+ // signalling the CV so that a blocked thread can immediately acquire the mutex when
+ // it wakes up.
+ {
+ unique_lock<mutex> l(lock_);
+ }
// Wake up all sleeping threads so they can check the cancellation state.
// While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
// ensure that all sleeping threads are awoken. The calls to NotifyAll() are not on the