You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:44 UTC

[drill] 05/05: DRILL-6731: use thread pool to run the runtime filter aggregating work closes #1459

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 0a3ebc24d941f75841dcb2a6251e22d2a988c46c
Author: weijie.tong <we...@alipay.com>
AuthorDate: Tue Oct 9 20:28:22 2018 +0800

    DRILL-6731: use thread pool to run the runtime filter aggregating work
    closes #1459
---
 .../org/apache/drill/exec/ops/FragmentContextImpl.java   | 10 ++++++----
 .../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++--------
 .../test/java/org/apache/drill/test/OperatorFixture.java |  3 ++-
 .../org/apache/drill/test/PhysicalOpUnitTestBase.java    |  2 +-
 4 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 1f9d489..fcfdc8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -209,7 +209,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
-    this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
+    boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+    if (enableRF) {
+      ExecutorService executorService = context.getExecutor();
+      this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService);
+    }
   }
 
   /**
@@ -472,9 +476,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    if (runtimeFilterSink != null) {
-      suppressingClose(runtimeFilterSink);
-    }
+    suppressingClose(runtimeFilterSink);
     suppressingClose(bufferManager);
     suppressingClose(allocator);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 754c68e..1468625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -18,10 +18,11 @@
 package org.apache.drill.exec.work.filter;
 
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -63,18 +64,17 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   private ReentrantLock aggregatedRFLock = new ReentrantLock();
 
-  private Thread asyncAggregateThread;
-
   private BufferAllocator bufferAllocator;
 
+  private Future future;
+
   private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
     this.bufferAllocator = bufferAllocator;
     AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-    asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-    asyncAggregateThread.start();
+    future = executorService.submit(asyncAggregateWorker);
   }
 
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
@@ -158,7 +158,7 @@ public class RuntimeFilterSink implements AutoCloseable {
 
   @Override
   public void close() throws Exception {
-    asyncAggregateThread.interrupt();
+    future.cancel(true);
     doCleanup();
   }
 
@@ -209,7 +209,7 @@ public class RuntimeFilterSink implements AutoCloseable {
           currentBookId.incrementAndGet();
         }
       } catch (InterruptedException e) {
-        logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+        logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
         Thread.currentThread().interrupt();
       } finally {
         doCleanup();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 81d0d1a..a1e7d0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -81,6 +81,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -197,7 +198,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
-      this.runtimeFilterSink = new RuntimeFilterSink(allocator);
+      this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool());
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 559f7f4..300e88b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -209,7 +209,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
-      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
+      this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool());
     }
 
     @Override