You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:43 UTC

[drill] 04/05: DRILL-6731: Resolving race conditions in RuntimeFilterSink Add condition variable to avoid starvation of producer thread while acquiring queue lock

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit de76e135316086386e2f7edd04ec1d5ca479bc59
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Wed Sep 26 13:20:02 2018 -0700

    DRILL-6731: Resolving race conditions in RuntimeFilterSink
    Add condition variable to avoid starvation of producer thread while acquiring queue lock
---
 .../physical/visitor/RuntimeFilterVisitor.java     |  12 +-
 .../drill/exec/work/filter/RuntimeFilterSink.java  | 127 +++++++++++++++------
 .../exec/work/filter/RuntimeFilterWritable.java    |  36 +++---
 3 files changed, 114 insertions(+), 61 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index bfba5f2..fcfa2bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -222,18 +222,10 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
           holder.setFromBuildSide(true);
           right.accept(this, holder);
           boolean routeToForeman = holder.needToRouteToForeman();
-          if (!routeToForeman) {
-            runtimeFilterDef.setSendToForeman(false);
-          } else {
-            runtimeFilterDef.setSendToForeman(true);
-          }
+          runtimeFilterDef.setSendToForeman(routeToForeman);
           List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
           for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-            if (!routeToForeman) {
-              bloomFilterDef.setLocal(true);
-            } else {
-              bloomFilterDef.setLocal(false);
-            }
+            bloomFilterDef.setLocal(!routeToForeman);
           }
         }
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 8f4c823..754c68e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -38,12 +39,28 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   private int staleBookId = 0;
 
+  /**
+   * RuntimeFilterWritable holding the aggregated version of all the received filter
+   */
   private RuntimeFilterWritable aggregated = null;
 
   private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
 
+  /**
+   * Flag used by Minor Fragment thread to indicate it has encountered error
+   */
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  /**
+   * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
+   * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
+   * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
+   * indicate producer not to put any new elements in it.
+   */
+  private ReentrantLock queueLock = new ReentrantLock();
+
+  private Condition notEmpty = queueLock.newCondition();
+
   private ReentrantLock aggregatedRFLock = new ReentrantLock();
 
   private Thread asyncAggregateThread;
@@ -62,24 +79,34 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
     if (running.get()) {
-      if (containOne()) {
-        boolean same = aggregated.same(runtimeFilterWritable);
-        if (!same) {
-          //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
-          //share the same FragmentContext.
-          try {
-            aggregatedRFLock.lock();
+      try {
+        aggregatedRFLock.lock();
+        if (containOne()) {
+          boolean same = aggregated.equals(runtimeFilterWritable);
+          if (!same) {
+            // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+            // share the same FragmentContext.
             aggregated.close();
-            aggregated = null;
-          } finally {
-            aggregatedRFLock.unlock();
+            currentBookId.set(0);
+            staleBookId = 0;
+            clearQueued(false);
           }
-          currentBookId.set(0);
-          staleBookId = 0;
-          clearQueued();
         }
+      } finally {
+        aggregatedRFLock.unlock();
+      }
+
+      try {
+        queueLock.lock();
+        if (rfQueue != null) {
+          rfQueue.add(runtimeFilterWritable);
+          notEmpty.signal();
+        } else {
+          runtimeFilterWritable.close();
+        }
+      } finally {
+        queueLock.unlock();
       }
-      rfQueue.add(runtimeFilterWritable);
     } else {
       runtimeFilterWritable.close();
     }
@@ -116,53 +143,77 @@ public class RuntimeFilterSink implements AutoCloseable {
     return aggregated != null;
   }
 
-  @Override
-  public void close() throws Exception {
+  private void doCleanup() {
     running.compareAndSet(true, false);
-    asyncAggregateThread.interrupt();
-    if (containOne()) {
-      try {
-        aggregatedRFLock.lock();
+    try {
+      aggregatedRFLock.lock();
+      if (containOne()) {
         aggregated.close();
-      } finally {
-        aggregatedRFLock.unlock();
+        aggregated = null;
       }
+    } finally {
+      aggregatedRFLock.unlock();
     }
-    clearQueued();
   }
 
-  private void clearQueued() {
+  @Override
+  public void close() throws Exception {
+    asyncAggregateThread.interrupt();
+    doCleanup();
+  }
+
+  private void clearQueued(boolean setToNull) {
     RuntimeFilterWritable toClear;
-    while ((toClear = rfQueue.poll()) != null) {
-      toClear.close();
+    try {
+      queueLock.lock();
+      while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
+        toClear.close();
+      }
+      rfQueue = (setToNull) ? null : rfQueue;
+    } finally {
+      queueLock.unlock();
     }
   }
 
-  class AsyncAggregateWorker implements Runnable {
+  private class AsyncAggregateWorker implements Runnable {
 
     @Override
     public void run() {
       try {
+        RuntimeFilterWritable toAggregate = null;
         while (running.get()) {
-          RuntimeFilterWritable toAggregate = rfQueue.take();
-          if (!running.get()) {
-            toAggregate.close();
-            return;
+          try {
+            queueLock.lock();
+            toAggregate = (rfQueue != null) ? rfQueue.poll() :  null;
+            if (toAggregate == null) {
+              notEmpty.await();
+              continue;
+            }
+          } finally {
+            queueLock.unlock();
           }
-          if (containOne()) {
-            try {
-              aggregatedRFLock.lock();
+
+          try {
+            aggregatedRFLock.lock();
+            if (containOne()) {
               aggregated.aggregate(toAggregate);
-            } finally {
-              aggregatedRFLock.unlock();
+
+              // Release the byteBuf referenced by toAggregate since aggregate will not do it
+              toAggregate.close();
+            } else {
+              aggregated = toAggregate;
             }
-          } else {
-            aggregated = toAggregate;
+          } finally {
+            aggregatedRFLock.unlock();
           }
           currentBookId.incrementAndGet();
         }
       } catch (InterruptedException e) {
         logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+        Thread.currentThread().interrupt();
+      } finally {
+        doCleanup();
+        clearQueued(true);
       }
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 302a480..9a971e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -36,9 +36,14 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
 
   private DrillBuf[] data;
 
+  private String identifier;
+
   public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
     this.runtimeFilterBDef = runtimeFilterBDef;
     this.data = data;
+    this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+      + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
 
@@ -90,7 +95,7 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       int capacity = src.readableBytes();
       DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
       int readerIndex = src.readerIndex();
-      src.readBytes(duplicateOne, 0, capacity);
+      duplicateOne.writeBytes(src);
       src.readerIndex(readerIndex);
       cloned[i] = duplicateOne;
       i++;
@@ -98,19 +103,25 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
     return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
   }
 
-  public boolean same(RuntimeFilterWritable other) {
-    BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
-    int otherMajorId = runtimeFilterDef.getMajorFragmentId();
-    int otherMinorId = runtimeFilterDef.getMinorFragmentId();
-    int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
-    int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
-    int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
-    int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
-    return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+  public String toString() {
+    return identifier;
   }
 
-  public String toString() {
-    return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof RuntimeFilterWritable) {
+      RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other;
+      return this.identifier.equals(otherRFW.identifier);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return identifier.hashCode();
   }
 
   @Override
@@ -119,5 +130,4 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
       buf.release();
     }
   }
-
 }