You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/09 19:02:49 UTC

[tinkerpop] branch tp4 updated: More work on ParallelRxJava thread pools. I also added SimpleRemoteParallelTest to check to make sure MachieServer + multi-threaded traversal execution play well together. They do.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 53d5623  More work on ParallelRxJava thread pools. I also added SimpleRemoteParallelTest to check to make sure MachieServer + multi-threaded traversal execution play well together. They do.
53d5623 is described below

commit 53d5623e34436a6debfc2adfde58a5a385b0724d
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 9 13:02:40 2019 -0600

    More work on ParallelRxJava thread pools. I also added SimpleRemoteParallelTest to check to make sure MachieServer + multi-threaded traversal execution play well together. They do.
---
 .../tinkerpop/machine/AbstractTestSuite.java       | 15 +++++++++++---
 .../machine/processor/beam/SimpleRemoteTest.java   | 15 ++++++++------
 .../machine/processor/pipes/SimpleRemoteTest.java  | 15 ++++++++------
 .../machine/processor/rxjava/ParallelRxJava.java   | 11 +++++++++--
 .../machine/processor/rxjava/RxJavaProcessor.java  | 15 +++++++-------
 .../processor/rxjava/strategy/RxJavaStrategy.java  |  8 ++++----
 .../machine/processor/rxjava/RxJavaBenchmark.java  |  4 ++--
 .../processor/rxjava/SimpleLocalParallelTest.java  |  2 +-
 ...rialTest.java => SimpleRemoteParallelTest.java} | 23 +++++++++++++---------
 .../processor/rxjava/SimpleRemoteSerialTest.java   | 15 ++++++++------
 10 files changed, 77 insertions(+), 46 deletions(-)

diff --git a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java
index 8524292..822636e 100644
--- a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java
+++ b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/AbstractTestSuite.java
@@ -40,16 +40,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public abstract class AbstractTestSuite<C> {
 
     protected Machine machine;
-    protected TraversalSource<C> g;
+    TraversalSource<C> g;
 
-    public AbstractTestSuite(final Machine machine, final Bytecode<C> source) {
+    AbstractTestSuite(final Machine machine, final Bytecode<C> source) {
         this.machine = machine;
         this.g = Gremlin.traversal(machine);
-        BytecodeUtil.mergeSourceInstructions(source, TraversalUtil.getBytecode(g));
+        BytecodeUtil.mergeSourceInstructions(source, TraversalUtil.getBytecode(this.g));
     }
 
     @AfterAll
     public void shutdown() {
+        this.g.close();
         this.machine.close();
     }
 
@@ -69,4 +70,12 @@ public abstract class AbstractTestSuite<C> {
         assertFalse(traversal.hasNext());
     }
 
+    public static void sleep(final int time) {
+        try {
+            Thread.sleep(time);
+        } catch (final InterruptedException e) {
+            // do nothing
+        }
+    }
+
 }
diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
index 5b3448a..72fac52 100644
--- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
+++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.tinkerpop.machine.processor.beam;
 
+import org.apache.tinkerpop.machine.AbstractTestSuite;
 import org.apache.tinkerpop.machine.SimpleTestSuite;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
 import org.apache.tinkerpop.machine.species.remote.MachineServer;
 import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 
 import java.util.Map;
 
@@ -46,14 +48,15 @@ public class SimpleRemoteTest extends SimpleTestSuite {
         super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
     }
 
+    @AfterEach
+    void delayShutdown() {
+        AbstractTestSuite.sleep(100);
+    }
+
     @AfterAll
-    static void stopServer() {
+    void stopServer() {
         SERVER.close();
-        try {
-            Thread.sleep(10);
-        } catch (final InterruptedException e) {
-
-        }
+        AbstractTestSuite.sleep(100);
     }
 
 }
\ No newline at end of file
diff --git a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
index 5e0b32b..f895ada 100644
--- a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
+++ b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.tinkerpop.machine.processor.pipes;
 
+import org.apache.tinkerpop.machine.AbstractTestSuite;
 import org.apache.tinkerpop.machine.SimpleTestSuite;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
 import org.apache.tinkerpop.machine.species.remote.MachineServer;
 import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -41,14 +43,15 @@ class SimpleRemoteTest extends SimpleTestSuite {
         super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
     }
 
+    @AfterEach
+    void delayShutdown() {
+        AbstractTestSuite.sleep(100);
+    }
+
     @AfterAll
-    static void stopServer() {
+    void stopServer() {
         SERVER.close();
-        try {
-            Thread.sleep(10);
-        } catch (final InterruptedException e) {
-
-        }
+        AbstractTestSuite.sleep(100);
     }
 
 }
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 9c9476d..daa8bd4 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -22,6 +22,7 @@ import io.reactivex.Flowable;
 import io.reactivex.parallel.ParallelFlowable;
 import io.reactivex.processors.PublishProcessor;
 import io.reactivex.schedulers.Schedulers;
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.function.BarrierFunction;
 import org.apache.tinkerpop.machine.function.BranchFunction;
@@ -47,11 +48,15 @@ import java.util.concurrent.ExecutorService;
  */
 public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
-    private ExecutorService threadPool;
+    private final ExecutorService threadPool;
+    private final String bytecodeId;
 
     ParallelRxJava(final Compilation<C, S, E> compilation, final ExecutorService threadPool) {
         super(compilation);
         this.threadPool = threadPool;
+        this.bytecodeId = compilation.getBytecode().getParent().isEmpty() ?
+                (String) BytecodeUtil.getSourceInstructions(compilation.getBytecode(), RxJavaProcessor.RX_ROOT_BYTECODE_ID).get(0).args()[0] :
+                null;
     }
 
     @Override
@@ -66,8 +71,10 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
                     sequential().
                     doFinally(() -> {
                         this.alive.set(Boolean.FALSE);
-                        if (this.compilation.getBytecode().getParent().isEmpty()) // only the parent compilation should close the thread pool
+                        if (null != this.bytecodeId) { // only the parent compilation should close the thread pool
                             this.threadPool.shutdown();
+                            RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
+                        }
                     }).
                     blockingSubscribe(); // thread this so results can be received before computation completes
 
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
index e9792eb..23346f5 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -32,15 +32,16 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class RxJavaProcessor implements ProcessorFactory {
 
-    public static final String RXJAVA_THREADS = "rxjava.threads";
-    public static final String RX_BYCODE_ID = "rx:bytecodeId";
-    private static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
+    public static final String RX_THREAD_POOL_SIZE = "rx.threadPool.size";
+    public static final String RX_ROOT_BYTECODE_ID = "rx:rootBytecodeId";
+    static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
 
     private final Map<String, Object> configuration;
 
@@ -54,11 +55,11 @@ public final class RxJavaProcessor implements ProcessorFactory {
 
     @Override
     public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) {
-        final int threads = (int) this.configuration.getOrDefault(RxJavaProcessor.RXJAVA_THREADS, 0);
-        final String id = (String) BytecodeUtil.getSourceInstructions(BytecodeUtil.getRootBytecode(compilation.getBytecode()), RX_BYCODE_ID).get(0).args()[0];
-        final ExecutorService threadPool = threads > 0 ? RxJavaProcessor.THREAD_POOLS.computeIfAbsent(id, key -> Executors.newFixedThreadPool(threads)) : null;
+        final int threads = (int) this.configuration.getOrDefault(RxJavaProcessor.RX_THREAD_POOL_SIZE, 0);
+        final String bytecodeId = (String) BytecodeUtil.getSourceInstructions(BytecodeUtil.getRootBytecode(compilation.getBytecode()), RX_ROOT_BYTECODE_ID).get(0).args()[0];
+        final ThreadPoolExecutor threadPool = threads > 0 ? (ThreadPoolExecutor) RxJavaProcessor.THREAD_POOLS.computeIfAbsent(bytecodeId, key -> Executors.newFixedThreadPool(threads)) : null;
         // System.out.println(id + "::" + threads + "--" + threadPool);
-        return null == threadPool ?
+        return null == threadPool || threadPool.getActiveCount() == threadPool.getMaximumPoolSize() ? // if the thread pool is saturated, serialize the processor
                 new SerialRxJava<>(compilation) :
                 new ParallelRxJava<>(compilation, threadPool);
     }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
index e26ca67..4e9de21 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -41,10 +41,10 @@ public final class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStra
     public <C> void apply(final Bytecode<C> bytecode) {
         if (bytecode.getParent().isEmpty()) { // root bytecode
             final String id = UUID.randomUUID().toString();
-            bytecode.addSourceInstruction(RxJavaProcessor.RX_BYCODE_ID, id);
+            bytecode.addSourceInstruction(RxJavaProcessor.RX_ROOT_BYTECODE_ID, id);
         } else if (!BytecodeUtil.hasSourceInstruction(bytecode, CommonCompiler.Symbols.WITH_PROCESSOR)) {
             if (RxJavaStrategy.isSimple(bytecode)) {
-                bytecode.addSourceInstruction(CommonCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 0)); // guaranteed serial execution
+                bytecode.addSourceInstruction(CommonCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, 0)); // guaranteed serial execution
             } else {
                 final Bytecode<C> root = BytecodeUtil.getRootBytecode(bytecode);
                 final List<SourceInstruction> processors = BytecodeUtil.getSourceInstructions(root, CommonCompiler.Symbols.WITH_PROCESSOR); // potential parallel execution
@@ -57,9 +57,9 @@ public final class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStra
 
     private static boolean isSimple(final Bytecode<?> bytecode) {
         final CompositeCompiler compiler = BytecodeUtil.getCompilers(bytecode);
-        return bytecode.getInstructions().size() < 4 && bytecode.getInstructions().stream().noneMatch(i -> {
+        return bytecode.getInstructions().size() < 5 && bytecode.getInstructions().stream().noneMatch(i -> {
             final FunctionType functionType = compiler.getFunctionType(i.op());
-            return FunctionType.FLATMAP == functionType; //|| FunctionType.BRANCH == functionType;
+            return FunctionType.FLATMAP == functionType || FunctionType.BRANCH == functionType;
         });
     }
 }
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
index 5dc9331..5ec8fc2 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
@@ -35,11 +35,11 @@ import java.util.Map;
  */
 class RxJavaBenchmark {
 
-    @Test
+    //@Test
     void benchmark() {
         final Machine machine = LocalMachine.open();
         final TraversalSource ser = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class);
-        final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors() - 1));
+        final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, Runtime.getRuntime().availableProcessors() - 1));
         final TraversalSource pipes = Gremlin.traversal(machine).withProcessor(PipesProcessor.class);
         final List<Long> input = new ArrayList<>(1000);
         for (long i = 0; i < 5000; i++) {
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
index ad578c5..46b37d2 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
@@ -33,7 +33,7 @@ public class SimpleLocalParallelTest extends SimpleTestSuite {
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
 
     static {
-        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors()-1));
+        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, Runtime.getRuntime().availableProcessors()-1));
     }
 
     SimpleLocalParallelTest() {
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteParallelTest.java
similarity index 76%
copy from java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
copy to java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteParallelTest.java
index d6ba73f..11daedf 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteParallelTest.java
@@ -18,37 +18,42 @@
  */
 package org.apache.tinkerpop.machine.processor.rxjava;
 
+import org.apache.tinkerpop.machine.AbstractTestSuite;
 import org.apache.tinkerpop.machine.SimpleTestSuite;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
 import org.apache.tinkerpop.machine.species.remote.MachineServer;
 import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+
+import java.util.Map;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SimpleRemoteSerialTest extends SimpleTestSuite {
+class SimpleRemoteParallelTest extends SimpleTestSuite {
 
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
     private static MachineServer SERVER = new MachineServer(7777);
 
     static {
-        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class);
+        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RX_THREAD_POOL_SIZE, Runtime.getRuntime().availableProcessors() - 1));
     }
 
-    SimpleRemoteSerialTest() {
+    SimpleRemoteParallelTest() {
         super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
     }
 
+    @AfterEach
+    void delayShutdown() {
+        AbstractTestSuite.sleep(100);
+    }
+
     @AfterAll
-    static void stopServer() {
+    void stopServer() {
         SERVER.close();
-        try {
-            Thread.sleep(10);
-        } catch (final InterruptedException e) {
-
-        }
+        AbstractTestSuite.sleep(100);
     }
 
 }
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
index d6ba73f..8582bb8 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
@@ -18,12 +18,14 @@
  */
 package org.apache.tinkerpop.machine.processor.rxjava;
 
+import org.apache.tinkerpop.machine.AbstractTestSuite;
 import org.apache.tinkerpop.machine.SimpleTestSuite;
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
 import org.apache.tinkerpop.machine.species.remote.MachineServer;
 import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -41,14 +43,15 @@ public class SimpleRemoteSerialTest extends SimpleTestSuite {
         super(RemoteMachine.open(6666, "localhost", 7777), BYTECODE);
     }
 
+    @AfterEach
+    void delayShutdown() {
+        AbstractTestSuite.sleep(100);
+    }
+
     @AfterAll
-    static void stopServer() {
+    void stopServer() {
         SERVER.close();
-        try {
-            Thread.sleep(10);
-        } catch (final InterruptedException e) {
-
-        }
+        AbstractTestSuite.sleep(100);
     }
 
 }
\ No newline at end of file