You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:12 UTC

[drill] 08/08: DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit b557b796dc1ca7796b0db956e39df1d52f212f12
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Jan 23 17:38:44 2019 -0800

    DRILL-7000: Queries failing with 'Failed to aggregate or route the RFW' do not complete
    
    closes #1621
---
 .../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index f69a44e..c0eceae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -99,6 +99,10 @@ public class RuntimeFilterSink implements Closeable
       joinMjId2Stopwatch.put(joinMjId, stopwatch);
     }
     synchronized (rfQueue) {
+      if (!running.get()) {
+        runtimeFilterWritable.close();
+        return;
+      }
       rfQueue.add(runtimeFilterWritable);
       rfQueue.notify();
     }
@@ -246,14 +250,22 @@ public class RuntimeFilterSink implements Closeable
           aggregate(toAggregate);
         } catch (Exception ex) {
           logger.error("Failed to aggregate or route the RFW", ex);
+
+          // Set running to false and cleanup pending RFW in queue. This will make sure producer
+          // thread is also indicated to stop and queue is cleaned up properly in failure cases
+          synchronized (rfQueue) {
+            running.set(false);
+          }
+          cleanupQueue();
           throw new DrillRuntimeException(ex);
         } finally {
-          if (toAggregate != null) {
             toAggregate.close();
-          }
         }
       }
+      cleanupQueue();
+    }
 
+    private void cleanupQueue() {
       if (!running.get()) {
         RuntimeFilterWritable toClose;
         while ((toClose = rfQueue.poll()) != null) {