You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:44 UTC
[drill] 05/05: DRILL-6731: use thread pool to run the runtime
filter aggregating work closes #1459
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 0a3ebc24d941f75841dcb2a6251e22d2a988c46c
Author: weijie.tong <we...@alipay.com>
AuthorDate: Tue Oct 9 20:28:22 2018 +0800
DRILL-6731: use thread pool to run the runtime filter aggregating work
closes #1459
---
.../org/apache/drill/exec/ops/FragmentContextImpl.java | 10 ++++++----
.../apache/drill/exec/work/filter/RuntimeFilterSink.java | 16 ++++++++--------
.../test/java/org/apache/drill/test/OperatorFixture.java | 3 ++-
.../org/apache/drill/test/PhysicalOpUnitTestBase.java | 2 +-
4 files changed, 17 insertions(+), 14 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 1f9d489..fcfdc8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -209,7 +209,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
stats = new FragmentStats(allocator, fragment.getAssignment());
bufferManager = new BufferManagerImpl(this.allocator);
constantValueHolderCache = Maps.newHashMap();
- this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
+ boolean enableRF = context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+ if (enableRF) {
+ ExecutorService executorService = context.getExecutor();
+ this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, executorService);
+ }
}
/**
@@ -472,9 +476,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
for (OperatorContextImpl opContext : contexts) {
suppressingClose(opContext);
}
- if (runtimeFilterSink != null) {
- suppressingClose(runtimeFilterSink);
- }
+ suppressingClose(runtimeFilterSink);
suppressingClose(bufferManager);
suppressingClose(allocator);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 754c68e..1468625 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -18,10 +18,11 @@
package org.apache.drill.exec.work.filter;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,18 +64,17 @@ public class RuntimeFilterSink implements AutoCloseable {
private ReentrantLock aggregatedRFLock = new ReentrantLock();
- private Thread asyncAggregateThread;
-
private BufferAllocator bufferAllocator;
+ private Future future;
+
private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
- public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+ public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
this.bufferAllocator = bufferAllocator;
AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
- asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
- asyncAggregateThread.start();
+ future = executorService.submit(asyncAggregateWorker);
}
public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
@@ -158,7 +158,7 @@ public class RuntimeFilterSink implements AutoCloseable {
@Override
public void close() throws Exception {
- asyncAggregateThread.interrupt();
+ future.cancel(true);
doCleanup();
}
@@ -209,7 +209,7 @@ public class RuntimeFilterSink implements AutoCloseable {
currentBookId.incrementAndGet();
}
} catch (InterruptedException e) {
- logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+ logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
Thread.currentThread().interrupt();
} finally {
doCleanup();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index 81d0d1a..a1e7d0d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -81,6 +81,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Test fixture for operator and (especially) "sub-operator" tests.
@@ -197,7 +198,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
this.controls = new ExecutionControls(options);
compiler = new CodeCompiler(config, options);
bufferManager = new BufferManagerImpl(allocator);
- this.runtimeFilterSink = new RuntimeFilterSink(allocator);
+ this.runtimeFilterSink = new RuntimeFilterSink(allocator, Executors.newCachedThreadPool());
}
private static FunctionImplementationRegistry newFunctionRegistry(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 559f7f4..300e88b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -209,7 +209,7 @@ public class PhysicalOpUnitTestBase extends ExecTest {
public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
- this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
+ this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator(), Executors.newCachedThreadPool());
}
@Override