You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/09 19:02:49 UTC
[tinkerpop] branch tp4 updated: More work on ParallelRxJava thread
pools. I also added SimpleRemoteParallelTest to check to make sure
MachieServer + multi-threaded traversal execution play well together. They
do.
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 53d5623 More work on ParallelRxJava thread pools. I also added SimpleRemoteParallelTest to check to make sure MachieServer + multi-threaded traversal execution play well together. They do.
53d5623 is described below
commit 53d5623e34436a6debfc2adfde58a5a385b0724d
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 9 13:02:40 2019 -0600
More work on ParallelRxJava thread pools. I also added SimpleRemoteParallelTest to check to make sure MachieServer + multi-threaded traversal execution play well together. They do.
---
.../tinkerpop/machine/AbstractTestSuite.java | 15 +++++++++++---
.../machine/processor/beam/SimpleRemoteTest.java | 15 ++++++++------
.../machine/processor/pipes/SimpleRemoteTest.java | 15 ++++++++------
.../machine/processor/rxjava/ParallelRxJava.java | 11 +++++++++--
.../machine/processor/rxjava/RxJavaProcessor.java | 15 +++++++-------
.../processor/rxjava/strategy/RxJavaStrategy.java | 8 ++++----
.../machine/processor/rxjava/RxJavaBenchmark.java | 4 ++--
.../processor/rxjava/SimpleLocalParallelTest.java | 2 +-
...rialTest.java => SimpleRemoteParallelTest.java} | 23 +++++++++++++---------
.../processor/rxjava/SimpleRemoteSerialTest.java | 15 ++++++++------
10 files changed, 77 insertions(+), 46 deletions(-)
diff --git a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java
index 8524292..822636e 100644
--- a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java
+++ b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java
@@ -40,16 +40,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class AbstractTestSuite<C> {
protected Machine machine;
- protected TraversalSource<C> g;
+ TraversalSource<C> g;
- public AbstractTestSuite(final Machine machine, final Bytecode<C> source) {
+ AbstractTestSuite(final Machine machine, final Bytecode<C> source) {
this.machine = machine;
this.g = Gremlin.traversal(machine);
- BytecodeUtil.mergeSourceInstructions(source, TraversalUtil.getBytecode(g));
+ BytecodeUtil.mergeSourceInstructions(source, TraversalUtil.getBytecode(this.g));
}
@AfterAll
public void shutdown() {
+ this.g.close();
this.machine.close();
}
@@ -69,4 +70,12 @@ public abstract class AbstractTestSuite<C> {
assertFalse(traversal.hasNext());
}
+ public static void sleep(final int time) {
+ try {
+ Thread.sleep(time);
+ } catch (final InterruptedException e) {
+ // do nothing
+ }
+ }
+
}
diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
index 5b3448a..72fac52 100644
--- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
+++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.tinkerpop.machine.processor.beam;
+import org.apache.tinkerpop.machine.AbstractTestSuite;
import org.apache.tinkerpop.machine.SimpleTestSuite;
import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
import org.apache.tinkerpop.machine.species.remote.MachineServer;
import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import java.util.Map;
@@ -46,14 +48,15 @@ public class SimpleRemoteTest extends SimpleTestSuite {
super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
}
+ @AfterEach
+ void delayShutdown() {
+ AbstractTestSuite.sleep(100);
+ }
+
@AfterAll
- static void stopServer() {
+ void stopServer() {
SERVER.close();
- try {
- Thread.sleep(10);
- } catch (final InterruptedException e) {
-
- }
+ AbstractTestSuite.sleep(100);
}
}
\ No newline at end of file
diff --git a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
index 5e0b32b..f895ada 100644
--- a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
+++ b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.tinkerpop.machine.processor.pipes;
+import org.apache.tinkerpop.machine.AbstractTestSuite;
import org.apache.tinkerpop.machine.SimpleTestSuite;
import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
import org.apache.tinkerpop.machine.species.remote.MachineServer;
import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -41,14 +43,15 @@ class SimpleRemoteTest extends SimpleTestSuite {
super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
}
+ @AfterEach
+ void delayShutdown() {
+ AbstractTestSuite.sleep(100);
+ }
+
@AfterAll
- static void stopServer() {
+ void stopServer() {
SERVER.close();
- try {
- Thread.sleep(10);
- } catch (final InterruptedException e) {
-
- }
+ AbstractTestSuite.sleep(100);
}
}
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 9c9476d..daa8bd4 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -22,6 +22,7 @@ import io.reactivex.Flowable;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.function.BarrierFunction;
import org.apache.tinkerpop.machine.function.BranchFunction;
@@ -47,11 +48,15 @@ import java.util.concurrent.ExecutorService;
*/
public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
- private ExecutorService threadPool;
+ private final ExecutorService threadPool;
+ private final String bytecodeId;
ParallelRxJava(final Compilation<C, S, E> compilation, final ExecutorService threadPool) {
super(compilation);
this.threadPool = threadPool;
+ this.bytecodeId = compilation.getBytecode().getParent().isEmpty() ?
+ (String) BytecodeUtil.getSourceInstructions(compilation.getBytecode(), RxJavaProcessor.RX_ROOT_BYTECODE_ID).get(0).args()[0] :
+ null;
}
@Override
@@ -66,8 +71,10 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
sequential().
doFinally(() -> {
this.alive.set(Boolean.FALSE);
- if (this.compilation.getBytecode().getParent().isEmpty()) // only the parent compilation should close the thread pool
+ if (null != this.bytecodeId) { // only the parent compilation should close the thread pool
this.threadPool.shutdown();
+ RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
+ }
}).
blockingSubscribe(); // thread this so results can be received before computation completes
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
index e9792eb..23346f5 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -32,15 +32,16 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class RxJavaProcessor implements ProcessorFactory {
- public static final String RXJAVA_THREADS = "rxjava.threads";
- public static final String RX_BYCODE_ID = "rx:bytecodeId";
- private static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
+ public static final String RX_THREAD_POOL_SIZE = "rx.threadPool.size";
+ public static final String RX_ROOT_BYTECODE_ID = "rx:rootBytecodeId";
+ static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
private final Map<String, Object> configuration;
@@ -54,11 +55,11 @@ public final class RxJavaProcessor implements ProcessorFactory {
@Override
public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) {
- final int threads = (int) this.configuration.getOrDefault(RxJavaProcessor.RXJAVA_THREADS, 0);
- final String id = (String) BytecodeUtil.getSourceInstructions(BytecodeUtil.getRootBytecode(compilation.getBytecode()), RX_BYCODE_ID).get(0).args()[0];
- final ExecutorService threadPool = threads > 0 ? RxJavaProcessor.THREAD_POOLS.computeIfAbsent(id, key -> Executors.newFixedThreadPool(threads)) : null;
+ final int threads = (int) this.configuration.getOrDefault(RxJavaProcessor.RX_THREAD_POOL_SIZE, 0);
+ final String bytecodeId = (String) BytecodeUtil.getSourceInstructions(BytecodeUtil.getRootBytecode(compilation.getBytecode()), RX_ROOT_BYTECODE_ID).get(0).args()[0];
+ final ThreadPoolExecutor threadPool = threads > 0 ? (ThreadPoolExecutor) RxJavaProcessor.THREAD_POOLS.computeIfAbsent(bytecodeId, key -> Executors.newFixedThreadPool(threads)) : null;
// System.out.println(id + "::" + threads + "--" + threadPool);
- return null == threadPool ?
+ return null == threadPool || threadPool.getActiveCount() == threadPool.getMaximumPoolSize() ? // if the thread pool is saturated, serialize the processor
new SerialRxJava<>(compilation) :
new ParallelRxJava<>(compilation, threadPool);
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
index e26ca67..4e9de21 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -41,10 +41,10 @@ public final class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStra
public <C> void apply(final Bytecode<C> bytecode) {
if (bytecode.getParent().isEmpty()) { // root bytecode
final String id = UUID.randomUUID().toString();
- bytecode.addSourceInstruction(RxJavaProcessor.RX_BYCODE_ID, id);
+ bytecode.addSourceInstruction(RxJavaProcessor.RX_ROOT_BYTECODE_ID, id);
} else if (!BytecodeUtil.hasSourceInstruction(bytecode, CommonCompiler.Symbols.WITH_PROCESSOR)) {
if (RxJavaStrategy.isSimple(bytecode)) {
- bytecode.addSourceInstruction(CommonCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 0)); // guaranteed serial execution
+ bytecode.addSourceInstruction(CommonCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, 0)); // guaranteed serial execution
} else {
final Bytecode<C> root = BytecodeUtil.getRootBytecode(bytecode);
final List<SourceInstruction> processors = BytecodeUtil.getSourceInstructions(root, CommonCompiler.Symbols.WITH_PROCESSOR); // potential parallel execution
@@ -57,9 +57,9 @@ public final class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStra
private static boolean isSimple(final Bytecode<?> bytecode) {
final CompositeCompiler compiler = BytecodeUtil.getCompilers(bytecode);
- return bytecode.getInstructions().size() < 4 && bytecode.getInstructions().stream().noneMatch(i -> {
+ return bytecode.getInstructions().size() < 5 && bytecode.getInstructions().stream().noneMatch(i -> {
final FunctionType functionType = compiler.getFunctionType(i.op());
- return FunctionType.FLATMAP == functionType; //|| FunctionType.BRANCH == functionType;
+ return FunctionType.FLATMAP == functionType || FunctionType.BRANCH == functionType;
});
}
}
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
index 5dc9331..5ec8fc2 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
@@ -35,11 +35,11 @@ import java.util.Map;
*/
class RxJavaBenchmark {
- @Test
+ //@Test
void benchmark() {
final Machine machine = LocalMachine.open();
final TraversalSource ser = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class);
- final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors() - 1));
+ final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, Runtime.getRuntime().availableProcessors() - 1));
final TraversalSource pipes = Gremlin.traversal(machine).withProcessor(PipesProcessor.class);
final List<Long> input = new ArrayList<>(1000);
for (long i = 0; i < 5000; i++) {
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
index ad578c5..46b37d2 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
@@ -33,7 +33,7 @@ public class SimpleLocalParallelTest extends SimpleTestSuite {
private final static Bytecode<Long> BYTECODE = new Bytecode<>();
static {
- BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors()-1));
+ BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, Runtime.getRuntime().availableProcessors()-1));
}
SimpleLocalParallelTest() {
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteParallelTest.java
similarity index 76%
copy from java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
copy to java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteParallelTest.java
index d6ba73f..11daedf 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteParallelTest.java
@@ -18,37 +18,42 @@
*/
package org.apache.tinkerpop.machine.processor.rxjava;
+import org.apache.tinkerpop.machine.AbstractTestSuite;
import org.apache.tinkerpop.machine.SimpleTestSuite;
import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
import org.apache.tinkerpop.machine.species.remote.MachineServer;
import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+
+import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SimpleRemoteSerialTest extends SimpleTestSuite {
+class SimpleRemoteParallelTest extends SimpleTestSuite {
private final static Bytecode<Long> BYTECODE = new Bytecode<>();
private static MachineServer SERVER = new MachineServer(7777);
static {
- BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class);
+ BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, Runtime.getRuntime().availableProcessors() - 1));
}
- SimpleRemoteSerialTest() {
+ SimpleRemoteParallelTest() {
super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
}
+ @AfterEach
+ void delayShutdown() {
+ AbstractTestSuite.sleep(100);
+ }
+
@AfterAll
- static void stopServer() {
+ void stopServer() {
SERVER.close();
- try {
- Thread.sleep(10);
- } catch (final InterruptedException e) {
-
- }
+ AbstractTestSuite.sleep(100);
}
}
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
index d6ba73f..8582bb8 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.tinkerpop.machine.processor.rxjava;
+import org.apache.tinkerpop.machine.AbstractTestSuite;
import org.apache.tinkerpop.machine.SimpleTestSuite;
import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
import org.apache.tinkerpop.machine.species.remote.MachineServer;
import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -41,14 +43,15 @@ public class SimpleRemoteSerialTest extends SimpleTestSuite {
super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
}
+ @AfterEach
+ void delayShutdown() {
+ AbstractTestSuite.sleep(100);
+ }
+
@AfterAll
- static void stopServer() {
+ void stopServer() {
SERVER.close();
- try {
- Thread.sleep(10);
- } catch (final InterruptedException e) {
-
- }
+ AbstractTestSuite.sleep(100);
}
}
\ No newline at end of file