You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:12 UTC
[drill] 08/08: DRILL-7000: Queries failing with 'Failed to
aggregate or route the RFW' do not complete
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit b557b796dc1ca7796b0db956e39df1d52f212f12
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Jan 23 17:38:44 2019 -0800
DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete
closes #1621
---
.../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index f69a44e..c0eceae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -99,6 +99,10 @@ public class RuntimeFilterSink implements Closeable
joinMjId2Stopwatch.put(joinMjId, stopwatch);
}
synchronized (rfQueue) {
+ if (!running.get()) {
+ runtimeFilterWritable.close();
+ return;
+ }
rfQueue.add(runtimeFilterWritable);
rfQueue.notify();
}
@@ -246,14 +250,22 @@ public class RuntimeFilterSink implements Closeable
aggregate(toAggregate);
} catch (Exception ex) {
logger.error("Failed to aggregate or route the RFW", ex);
+
+ // Set running to false and cleanup pending RFW in queue. This will make sure producer
+ // thread is also indicated to stop and queue is cleaned up properly in failure cases
+ synchronized (rfQueue) {
+ running.set(false);
+ }
+ cleanupQueue();
throw new DrillRuntimeException(ex);
} finally {
- if (toAggregate != null) {
toAggregate.close();
- }
}
}
+ cleanupQueue();
+ }
+ private void cleanupQueue() {
if (!running.get()) {
RuntimeFilterWritable toClose;
while ((toClose = rfQueue.poll()) != null) {