You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/09 17:51:51 UTC

[tinkerpop] branch tp4 updated: thread pool hell. I have it so RxJavaStrategy analyzes nested compilations to see if they are 'simple' or not. if they are simple, then a serial flow is used to execute it. This way, we don't exhaust the threadpool with overly threaded nests of compilations. RxJavaStrategy is the first complex strategy. It uses the new Bytecode hiearchy to assoicate a thread pool with the root bytecode.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new ef0fd1f  thread pool hell. I have it so RxJavaStrategy analyzes nested compilations to see if they are 'simple' or not. if they are simple, then a serial flow is used to execute it. This way, we don't exhaust the threadpool with overly threaded nests of compilations. RxJavaStrategy is the first complex strategy. It uses the new Bytecode hiearchy to assoicate a thread pool with the root bytecode.
ef0fd1f is described below

commit ef0fd1f3a6f87939ebd063f14f078cf47750f3ac
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 9 11:51:42 2019 -0600

    thread pool hell. I have it so RxJavaStrategy analyzes nested compilations to see if they are 'simple' or not. if they are simple, then a serial flow is used to execute it. This way, we don't exhaust the threadpool with overly threaded nests of compilations. RxJavaStrategy is the first complex strategy. It uses the new Bytecode hiearchy to assoicate a thread pool with the root bytecode.
---
 .../machine/processor/rxjava/ParallelRxJava.java   |  2 +-
 .../machine/processor/rxjava/RxJavaProcessor.java  |  4 ++--
 .../machine/processor/rxjava/SerialRxJava.java     |  2 +-
 .../processor/rxjava/strategy/RxJavaStrategy.java  | 27 ++++++++++++++++------
 .../machine/processor/rxjava/RxJavaBenchmark.java  |  6 ++---
 .../processor/rxjava/SimpleLocalParallelTest.java  |  2 +-
 6 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 5b81368..9c9476d 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -64,8 +64,8 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
                             runOn(Schedulers.from(this.threadPool)), this.compilation).
                     doOnNext(this.ends::add).
                     sequential().
-                    doOnComplete(() -> this.alive.set(Boolean.FALSE)).
                     doFinally(() -> {
+                        this.alive.set(Boolean.FALSE);
                         if (this.compilation.getBytecode().getParent().isEmpty()) // only the parent compilation should close the thread pool
                             this.threadPool.shutdown();
                     }).
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
index be5d850..e9792eb 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -56,8 +56,8 @@ public final class RxJavaProcessor implements ProcessorFactory {
     public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) {
         final int threads = (int) this.configuration.getOrDefault(RxJavaProcessor.RXJAVA_THREADS, 0);
         final String id = (String) BytecodeUtil.getSourceInstructions(BytecodeUtil.getRootBytecode(compilation.getBytecode()), RX_BYCODE_ID).get(0).args()[0];
-        final ExecutorService threadPool = RxJavaProcessor.THREAD_POOLS.compute(id, (key, value) -> null == value && threads > 0 ? Executors.newCachedThreadPool() : value);
-        // System.out.println(id + "--" + threadPool + "---" + THREAD_POOLS);
+        final ExecutorService threadPool = threads > 0 ? RxJavaProcessor.THREAD_POOLS.computeIfAbsent(id, key -> Executors.newFixedThreadPool(threads)) : null;
+        // System.out.println(id + "::" + threads + "--" + threadPool);
         return null == threadPool ?
                 new SerialRxJava<>(compilation) :
                 new ParallelRxJava<>(compilation, threadPool);
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index ace99b4..183f043 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -55,7 +55,7 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
             this.alive.set(Boolean.TRUE);
             SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation).
                     doOnNext(this.ends::add).
-                    doOnComplete(() -> this.alive.set(Boolean.FALSE)).
+                    doFinally(() -> this.alive.set(Boolean.FALSE)).
                     subscribe();
         }
         while (this.alive.get() && this.ends.isEmpty()) {
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
index 7e03e6e..e26ca67 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -22,31 +22,44 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.bytecode.SourceInstruction;
 import org.apache.tinkerpop.machine.bytecode.compiler.CommonCompiler;
+import org.apache.tinkerpop.machine.bytecode.compiler.CompositeCompiler;
+import org.apache.tinkerpop.machine.bytecode.compiler.FunctionType;
 import org.apache.tinkerpop.machine.processor.rxjava.RxJavaProcessor;
 import org.apache.tinkerpop.machine.strategy.AbstractStrategy;
 import org.apache.tinkerpop.machine.strategy.Strategy;
 
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStrategy> implements Strategy.ProviderStrategy {
-
+public final class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStrategy> implements Strategy.ProviderStrategy {
 
     @Override
     public <C> void apply(final Bytecode<C> bytecode) {
-        if (bytecode.getParent().isEmpty()) {
+        if (bytecode.getParent().isEmpty()) { // root bytecode
             final String id = UUID.randomUUID().toString();
             bytecode.addSourceInstruction(RxJavaProcessor.RX_BYCODE_ID, id);
         } else if (!BytecodeUtil.hasSourceInstruction(bytecode, CommonCompiler.Symbols.WITH_PROCESSOR)) {
-            final Bytecode<C> root = BytecodeUtil.getRootBytecode(bytecode);
-            final List<SourceInstruction> processors = BytecodeUtil.getSourceInstructions(root, CommonCompiler.Symbols.WITH_PROCESSOR);
-            for (final SourceInstruction sourceInstruction : processors) {
-                bytecode.addSourceInstruction(sourceInstruction.op(), sourceInstruction.args());
+            if (RxJavaStrategy.isSimple(bytecode)) {
+                bytecode.addSourceInstruction(CommonCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 0)); // guaranteed serial execution
+            } else {
+                final Bytecode<C> root = BytecodeUtil.getRootBytecode(bytecode);
+                final List<SourceInstruction> processors = BytecodeUtil.getSourceInstructions(root, CommonCompiler.Symbols.WITH_PROCESSOR); // potential parallel execution
+                for (final SourceInstruction sourceInstruction : processors) {
+                    bytecode.addSourceInstruction(sourceInstruction.op(), sourceInstruction.args());
+                }
             }
         }
     }
 
+    private static boolean isSimple(final Bytecode<?> bytecode) {
+        final CompositeCompiler compiler = BytecodeUtil.getCompilers(bytecode);
+        return bytecode.getInstructions().size() < 4 && bytecode.getInstructions().stream().noneMatch(i -> {
+            final FunctionType functionType = compiler.getFunctionType(i.op());
+            return FunctionType.FLATMAP == functionType; //|| FunctionType.BRANCH == functionType;
+        });
+    }
 }
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
index 94b49c1..5dc9331 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
@@ -36,14 +36,14 @@ import java.util.Map;
 class RxJavaBenchmark {
 
     @Test
-    public void benchmark() {
+    void benchmark() {
         final Machine machine = LocalMachine.open();
         final TraversalSource ser = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class);
         final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors() - 1));
         final TraversalSource pipes = Gremlin.traversal(machine).withProcessor(PipesProcessor.class);
         final List<Long> input = new ArrayList<>(1000);
-        for (long i = 0; i < 1000; i++) {
-            input.add(i+1);
+        for (long i = 0; i < 5000; i++) {
+            input.add(i + 1);
         }
         final int runs = 30;
         System.out.println("Threads used: " + (Runtime.getRuntime().availableProcessors() - 1));
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
index 3edd3fd..ad578c5 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
@@ -33,7 +33,7 @@ public class SimpleLocalParallelTest extends SimpleTestSuite {
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
 
     static {
-        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 20));
+        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors()-1));
     }
 
     SimpleLocalParallelTest() {