You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/09 15:03:47 UTC

[tinkerpop] branch tp4 updated: nested Bytecode now knows about its parent bytecode. It is also possible to walk the Bytecode tree up to the root. This is used to share information between Compilations such as thread pool info in ParallelRxJava. Noticing a weird relationship between ProcessorFactory and Compilation. Need to think things through. Anywho, for now ParallelRxJava traverasls share a common thread pool for their execution. Need to get smart and not thread simple nests so we don't spawn so many threads.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new ae6ac40  nested Bytecode now knows about its parent bytecode. It is also possible to walk the Bytecode tree up to the root. This is used to share information between Compilations such as thread pool info in ParallelRxJava. Noticing a weird relationship between ProcessorFactory and Compilation. Need to think things through. Anywho, for now ParallelRxJava traverasls share a common thread pool for their execution. Need to get smart and not thread simple nests so we don't spawn so ma [...]
ae6ac40 is described below

commit ae6ac400ba1eb511ab52995dad430dda12365d99
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 9 09:03:34 2019 -0600

    nested Bytecode now knows about its parent bytecode. It is also possible to walk the Bytecode tree up to the root. This is used to share information between Compilations such as thread pool info in ParallelRxJava. Noticing a weird relationship between ProcessorFactory and Compilation. Need to think things through. Anywho, for now ParallelRxJava traverasls share a common thread pool for their execution. Need to get smart and not thread simple nests so we don't spawn so many threads.
---
 .../tinkerpop/language/gremlin/TraversalUtil.java  |  5 +-
 .../language/gremlin/common/CommonTraversal.java   |  6 +--
 .../language/gremlin/core/CoreTraversal.java       |  6 +--
 .../tinkerpop/machine/bytecode/Bytecode.java       | 33 +++++++++++-
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   | 24 +++++++++
 .../tinkerpop/machine/bytecode/Instruction.java    | 16 ++----
 .../machine/bytecode/compiler/Compilation.java     |  8 +++
 .../tinkerpop/machine/util/StringFactory.java      |  5 ++
 .../tinkerpop/machine/bytecode/BytecodeTest.java   | 60 ++++++++++++++++++++++
 .../machine/processor/rxjava/ParallelRxJava.java   | 13 ++---
 .../machine/processor/rxjava/RxJavaProcessor.java  | 22 ++++++--
 .../processor/rxjava/strategy/RxJavaStrategy.java  | 20 ++++++--
 .../machine/processor/rxjava/RxJavaBenchmark.java  |  2 +-
 .../processor/rxjava/SimpleLocalParallelTest.java  |  2 +-
 14 files changed, 183 insertions(+), 39 deletions(-)

diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalUtil.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalUtil.java
index efdf824..3587e4c 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalUtil.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/TraversalUtil.java
@@ -45,9 +45,8 @@ public final class TraversalUtil {
     }
 
     public static <C, S, E> Traversal<C, S, E> insertRepeatInstruction(final AbstractTraversal<C, S, E> traversal, final char type, final Object argument) {
-        final Instruction<C> lastInstruction = traversal.bytecode.lastInstruction();
-        if (lastInstruction.op().equals(Symbols.REPEAT))
-            lastInstruction.addArgs(type, argument);
+        if (traversal.bytecode.lastInstruction().op().equals(Symbols.REPEAT))
+            traversal.bytecode.addArgs(type, argument);
         else
             traversal.addInstruction(Symbols.REPEAT, type, argument);
         return traversal;
diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
index 7d44169..95aeb51 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/common/CommonTraversal.java
@@ -61,13 +61,13 @@ public class CommonTraversal<C, S, E> extends AbstractTraversal<C, S, E> {
 
     @Override
     public Traversal<C, S, E> by(final String byString) {
-        this.bytecode.lastInstruction().addArg(byString);
+        this.bytecode.addArgs(byString);
         return this;
     }
 
     @Override
     public Traversal<C, S, E> by(final Traversal<C, ?, ?> byTraversal) {
-        this.bytecode.lastInstruction().addArg(TraversalUtil.getBytecode(byTraversal));
+        this.bytecode.addArgs(TraversalUtil.getBytecode(byTraversal));
         return this;
     }
 
@@ -78,7 +78,7 @@ public class CommonTraversal<C, S, E> extends AbstractTraversal<C, S, E> {
 
     @Override
     public Traversal<C, S, E> by(final Traversal<C, ?, ?> byTraversal, final Order order) {
-        this.bytecode.lastInstruction().addArgs(TraversalUtil.getBytecode(byTraversal), order.name());
+        this.bytecode.addArgs(TraversalUtil.getBytecode(byTraversal), order.name());
         return this;
     }
 
diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
index fdd6762..1b7d9e1 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/core/CoreTraversal.java
@@ -66,13 +66,13 @@ public class CoreTraversal<C, S, E> extends AbstractTraversal<C, S, E> {
 
     @Override
     public Traversal<C, S, E> by(final String byString) {
-        this.bytecode.lastInstruction().addArg(byString);
+        this.bytecode.addArgs(byString);
         return this;
     }
 
     @Override
     public Traversal<C, S, E> by(final Traversal<C, ?, ?> byTraversal) {
-        this.bytecode.lastInstruction().addArg(TraversalUtil.getBytecode(byTraversal));
+        this.bytecode.addArgs(TraversalUtil.getBytecode(byTraversal));
         return this;
     }
 
@@ -83,7 +83,7 @@ public class CoreTraversal<C, S, E> extends AbstractTraversal<C, S, E> {
 
     @Override
     public Traversal<C, S, E> by(final Traversal<C, ?, ?> byTraversal, final Order order) {
-        this.bytecode.lastInstruction().addArgs(TraversalUtil.getBytecode(byTraversal), order);
+        this.bytecode.addArgs(TraversalUtil.getBytecode(byTraversal), order);
         return this;
     }
 
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
index ba77f99..42216f5 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Bytecode.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -31,12 +32,15 @@ public final class Bytecode<C> implements Cloneable, Serializable {
 
     private List<SourceInstruction> sourceInstructions = new ArrayList<>();
     private List<Instruction<C>> instructions = new ArrayList<>();
+    private Bytecode<C> parent = null;
 
     public void addSourceInstruction(final String op, final Object... args) {
+        BytecodeUtil.linkBytecodeChildren(this, args);
         this.sourceInstructions.add(new SourceInstruction(op, args));
     }
 
     public void addUniqueSourceInstruction(final String op, final Object... args) {
+        BytecodeUtil.linkBytecodeChildren(this, args);
         this.sourceInstructions.removeIf(instruction -> instruction.op().equals(op));
         this.sourceInstructions.add(new SourceInstruction(op, args));
     }
@@ -45,9 +49,18 @@ public final class Bytecode<C> implements Cloneable, Serializable {
         return this.sourceInstructions;
     }
 
+    public Optional<Bytecode<C>> getParent() {
+        return Optional.ofNullable(this.parent);
+    }
+
+    public void setParent(final Bytecode<C> parent) {
+        this.parent = parent;
+    }
+
     ///
 
     public void addInstruction(final Coefficient<C> coefficient, final String op, final Object... args) {
+        BytecodeUtil.linkBytecodeChildren(this, args);
         this.instructions.add(new Instruction<>(coefficient, op, args));
     }
 
@@ -59,6 +72,16 @@ public final class Bytecode<C> implements Cloneable, Serializable {
         return this.instructions.get(this.instructions.size() - 1);
     }
 
+    public void addArgs(final Object... args) {
+        final Instruction<C> lastInstruction = this.lastInstruction();
+        BytecodeUtil.linkBytecodeChildren(this, args);
+        final Object[] oldArgs = lastInstruction.args();
+        final Object[] newArgs = new Object[oldArgs.length + args.length];
+        System.arraycopy(oldArgs, 0, newArgs, 0, oldArgs.length);
+        System.arraycopy(args, 0, newArgs, oldArgs.length, args.length);
+        lastInstruction.args = newArgs;
+    }
+
     @Override
     public int hashCode() {
         return this.sourceInstructions.hashCode() ^ this.instructions.hashCode();
@@ -80,8 +103,14 @@ public final class Bytecode<C> implements Cloneable, Serializable {
     public Bytecode<C> clone() {
         try {
             final Bytecode<C> clone = (Bytecode<C>) super.clone();
-            clone.sourceInstructions = new ArrayList<>(this.sourceInstructions);
-            clone.instructions = new ArrayList<>(this.instructions);
+            clone.sourceInstructions = new ArrayList<>(this.sourceInstructions.size());
+            clone.instructions = new ArrayList<>(this.instructions.size());
+            for (final SourceInstruction sourceInstruction : this.sourceInstructions) {
+                clone.addSourceInstruction(sourceInstruction.op(), sourceInstruction.args());
+            }
+            for (final Instruction<C> instruction : this.instructions) {
+                clone.addInstruction(instruction.coefficient(), instruction.op(), instruction.args());
+            }
             return clone;
         } catch (final CloneNotSupportedException e) {
             throw new RuntimeException(e.getMessage(), e);
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index 5272ce7..1e2ab57 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -144,6 +144,15 @@ public final class BytecodeUtil {
         return false;
     }
 
+    public static List<SourceInstruction> getSourceInstructions(final Bytecode<?> bytecode, final String op) {
+        final List<SourceInstruction> sourceInstructions = new ArrayList<>();
+        for (final SourceInstruction sourceInstruction : bytecode.getSourceInstructions()) {
+            if (sourceInstruction.op().equals(op))
+                sourceInstructions.add(sourceInstruction);
+        }
+        return sourceInstructions;
+    }
+
     public static <C> void replaceInstruction(final Bytecode<C> bytecode, final Instruction<C> oldInstruction, final Instruction<C> newInstruction) {
         int index = bytecode.getInstructions().indexOf(oldInstruction);
         bytecode.getInstructions().remove(index);
@@ -154,6 +163,21 @@ public final class BytecodeUtil {
         to.getSourceInstructions().addAll(0, from.getSourceInstructions());
     }
 
+    public static <C> Bytecode<C> getRootBytecode(final Bytecode<C> child) {
+        Bytecode<C> root = child;
+        while (root.getParent().isPresent()) {
+            root = root.getParent().get();
+        }
+        return root;
+    }
+
+    public static <C> void linkBytecodeChildren(final Bytecode<C> parent, final Object... args) {
+        for (final Object arg : args) {
+            if (arg instanceof Bytecode)
+                ((Bytecode<C>) arg).setParent(parent);
+        }
+    }
+
     public static <C> Optional<TraverserFactory<C>> getTraverserFactory(final Bytecode<C> bytecode) {
         // TODO: make this real
         for (final Instruction<C> instruction : bytecode.getInstructions()) {
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
index 48088b8..430479d 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/Instruction.java
@@ -31,7 +31,7 @@ public final class Instruction<C> implements Serializable {
 
     private final Coefficient<C> coefficient;
     private final String op;
-    private Object[] args;
+    Object[] args;
     private String label = null;
 
     public Instruction(final Coefficient<C> coefficient, final String op, final Object... args) {
@@ -60,18 +60,6 @@ public final class Instruction<C> implements Serializable {
         this.label = label;
     }
 
-    public void addArg(final Object arg) {
-        final Object[] newArgs = new Object[this.args.length + 1];
-        System.arraycopy(this.args, 0, newArgs, 0, this.args.length);
-        newArgs[newArgs.length - 1] = arg;
-        this.args = newArgs;
-    }
-
-    public void addArgs(final Object... args) {
-        for (final Object arg : args) {
-            this.addArg(arg);
-        }
-    }
 
     @Override
     public int hashCode() {
@@ -93,4 +81,6 @@ public final class Instruction<C> implements Serializable {
     public String toString() {
         return StringFactory.makeInstructionString(this);
     }
+
+
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
index 6745a68..32a04e4 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
@@ -40,6 +40,7 @@ import java.util.List;
  */
 public final class Compilation<C, S, E> implements Serializable, Cloneable {
 
+    private final Bytecode<C> bytecode;
     private List<CFunction<C>> functions;
     private final StructureFactory structureFactory;
     private final ProcessorFactory processorFactory;
@@ -47,6 +48,7 @@ public final class Compilation<C, S, E> implements Serializable, Cloneable {
     private transient Processor<C, S, E> processor;
 
     public Compilation(final Bytecode<C> bytecode) {
+        this.bytecode = bytecode;
         BytecodeUtil.strategize(bytecode);
         this.structureFactory = BytecodeUtil.getStructureFactory(bytecode).orElse(EmptyStructure.instance());
         this.processorFactory = BytecodeUtil.getProcessorFactory(bytecode).orElse(EmptyProcessor.instance());
@@ -55,6 +57,7 @@ public final class Compilation<C, S, E> implements Serializable, Cloneable {
     }
 
     public Compilation(final SourceCompilation<C> source, final Bytecode<C> bytecode) {
+        this.bytecode = bytecode;
         BytecodeUtil.mergeSourceInstructions(source.getSourceCode(), bytecode);
         BytecodeUtil.strategize(bytecode, source.getStrategies());
         this.structureFactory = source.getStructureFactory();
@@ -64,6 +67,7 @@ public final class Compilation<C, S, E> implements Serializable, Cloneable {
     }
 
     public Compilation(final ProcessorFactory processorFactory) {
+        this.bytecode = new Bytecode<>();
         this.structureFactory = EmptyStructure.instance();
         this.processorFactory = processorFactory;
         this.traverserFactory = null;
@@ -128,6 +132,10 @@ public final class Compilation<C, S, E> implements Serializable, Cloneable {
         return this.traverserFactory;
     }
 
+    public Bytecode<C> getBytecode() {
+        return this.bytecode;
+    }
+
     @Override
     public String toString() {
         return this.functions.toString();
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/StringFactory.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/StringFactory.java
index deae893..573bb12 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/StringFactory.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/StringFactory.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.machine.bytecode.Instruction;
 import org.apache.tinkerpop.machine.bytecode.SourceInstruction;
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.CFunction;
+import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
 import java.util.ArrayList;
@@ -98,4 +99,8 @@ public final class StringFactory {
     public static String makeTraverserString(final Traverser<?, ?> traverser) {
         return traverser.coefficient().toString() + traverser.object();
     }
+
+    public static String makeProcessorFactoryString(final ProcessorFactory processorFactory) {
+        return processorFactory.getClass().getSimpleName();
+    }
 }
diff --git a/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/BytecodeTest.java b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/BytecodeTest.java
new file mode 100644
index 0000000..9653d52
--- /dev/null
+++ b/java/machine/machine-core/src/test/java/org/apache/tinkerpop/machine/bytecode/BytecodeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.bytecode;
+
+import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+class BytecodeTest {
+
+    @Test
+    void shouldHaveParents() {
+        Bytecode<Long> parent = new Bytecode<>();
+        assertFalse(parent.getParent().isPresent());
+        Bytecode<Long> child = new Bytecode<>();
+        parent.addInstruction(LongCoefficient.create(), "test", child);
+        assertTrue(child.getParent().isPresent());
+        assertEquals(parent, child.getParent().get());
+        assertFalse(child.getParent().get().getParent().isPresent());
+    }
+
+    @Test
+    void shouldCloneParents() {
+        Bytecode<Long> parent = new Bytecode<>();
+        assertFalse(parent.getParent().isPresent());
+        Bytecode<Long> child = new Bytecode<>();
+        parent.addInstruction(LongCoefficient.create(), "test", child);
+
+        Bytecode<Long> cloneParent = parent.clone();
+        Bytecode<Long> cloneChild = (Bytecode<Long>) cloneParent.lastInstruction().args()[0];
+        assertTrue(cloneChild.getParent().isPresent());
+        assertEquals(cloneParent, cloneChild.getParent().get());
+        assertFalse(cloneParent.getParent().isPresent());
+        cloneParent.addArgs("hello");
+        assertNotEquals(cloneParent, parent);
+    }
+}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 2ee8b35..5b81368 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -41,19 +41,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
-    private final int threads;
     private ExecutorService threadPool;
 
-    ParallelRxJava(final Compilation<C, S, E> compilation, final int threads) {
+    ParallelRxJava(final Compilation<C, S, E> compilation, final ExecutorService threadPool) {
         super(compilation);
-        this.threads = threads;
+        this.threadPool = threadPool;
     }
 
     @Override
@@ -61,15 +59,18 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
         if (!this.executed) {
             this.executed = true;
             this.alive.set(Boolean.TRUE);
-            this.threadPool = Executors.newFixedThreadPool(this.threads);
             this.compile(
                     ParallelFlowable.from(Flowable.fromIterable(this.starts)).
                             runOn(Schedulers.from(this.threadPool)), this.compilation).
                     doOnNext(this.ends::add).
                     sequential().
                     doOnComplete(() -> this.alive.set(Boolean.FALSE)).
-                    doFinally(this.threadPool::shutdown).
+                    doFinally(() -> {
+                        if (this.compilation.getBytecode().getParent().isEmpty()) // only the parent compilation should close the thread pool
+                            this.threadPool.shutdown();
+                    }).
                     blockingSubscribe(); // thread this so results can be received before computation completes
+
         }
     }
 
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
index b940e51..be5d850 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java
@@ -18,15 +18,20 @@
  */
 package org.apache.tinkerpop.machine.processor.rxjava;
 
+import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.processor.rxjava.strategy.RxJavaStrategy;
 import org.apache.tinkerpop.machine.strategy.Strategy;
+import org.apache.tinkerpop.machine.util.StringFactory;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -34,6 +39,8 @@ import java.util.Set;
 public final class RxJavaProcessor implements ProcessorFactory {
 
     public static final String RXJAVA_THREADS = "rxjava.threads";
+    public static final String RX_BYCODE_ID = "rx:bytecodeId";
+    private static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
 
     private final Map<String, Object> configuration;
 
@@ -47,13 +54,22 @@ public final class RxJavaProcessor implements ProcessorFactory {
 
     @Override
     public <C, S, E> Processor<C, S, E> mint(final Compilation<C, S, E> compilation) {
-        return this.configuration.containsKey(RXJAVA_THREADS) ?
-                new ParallelRxJava<>(compilation, (int) this.configuration.get(RXJAVA_THREADS)) :
-                new SerialRxJava<>(compilation);
+        final int threads = (int) this.configuration.getOrDefault(RxJavaProcessor.RXJAVA_THREADS, 0);
+        final String id = (String) BytecodeUtil.getSourceInstructions(BytecodeUtil.getRootBytecode(compilation.getBytecode()), RX_BYCODE_ID).get(0).args()[0];
+        final ExecutorService threadPool = RxJavaProcessor.THREAD_POOLS.compute(id, (key, value) -> null == value && threads > 0 ? Executors.newCachedThreadPool() : value);
+        // System.out.println(id + "--" + threadPool + "---" + THREAD_POOLS);
+        return null == threadPool ?
+                new SerialRxJava<>(compilation) :
+                new ParallelRxJava<>(compilation, threadPool);
     }
 
     @Override
     public Set<Strategy<?>> getStrategies() {
         return Set.of(new RxJavaStrategy());
     }
+
+    @Override
+    public String toString() {
+        return StringFactory.makeProcessorFactoryString(this);
+    }
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
index ed33a98..7e03e6e 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
@@ -20,21 +20,33 @@ package org.apache.tinkerpop.machine.processor.rxjava.strategy;
 
 import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.BytecodeUtil;
-import org.apache.tinkerpop.machine.bytecode.compiler.CoreCompiler;
+import org.apache.tinkerpop.machine.bytecode.SourceInstruction;
+import org.apache.tinkerpop.machine.bytecode.compiler.CommonCompiler;
 import org.apache.tinkerpop.machine.processor.rxjava.RxJavaProcessor;
 import org.apache.tinkerpop.machine.strategy.AbstractStrategy;
 import org.apache.tinkerpop.machine.strategy.Strategy;
 
-import java.util.Map;
+import java.util.List;
+import java.util.UUID;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public class RxJavaStrategy extends AbstractStrategy<Strategy.ProviderStrategy> implements Strategy.ProviderStrategy {
+
+
     @Override
     public <C> void apply(final Bytecode<C> bytecode) {
-        if (!BytecodeUtil.hasSourceInstruction(bytecode, CoreCompiler.Symbols.WITH_PROCESSOR)) {
-            bytecode.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 10)); // TODO: need root in strategies
+        if (bytecode.getParent().isEmpty()) {
+            final String id = UUID.randomUUID().toString();
+            bytecode.addSourceInstruction(RxJavaProcessor.RX_BYCODE_ID, id);
+        } else if (!BytecodeUtil.hasSourceInstruction(bytecode, CommonCompiler.Symbols.WITH_PROCESSOR)) {
+            final Bytecode<C> root = BytecodeUtil.getRootBytecode(bytecode);
+            final List<SourceInstruction> processors = BytecodeUtil.getSourceInstructions(root, CommonCompiler.Symbols.WITH_PROCESSOR);
+            for (final SourceInstruction sourceInstruction : processors) {
+                bytecode.addSourceInstruction(sourceInstruction.op(), sourceInstruction.args());
+            }
         }
     }
+
 }
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
index 1051c47..94b49c1 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
@@ -42,7 +42,7 @@ class RxJavaBenchmark {
         final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors() - 1));
         final TraversalSource pipes = Gremlin.traversal(machine).withProcessor(PipesProcessor.class);
         final List<Long> input = new ArrayList<>(1000);
-        for (long i = 0; i < 100000; i++) {
+        for (long i = 0; i < 1000; i++) {
             input.add(i+1);
         }
         final int runs = 30;
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
index 59d73ec..3edd3fd 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleLocalParallelTest.java
@@ -33,7 +33,7 @@ public class SimpleLocalParallelTest extends SimpleTestSuite {
     private final static Bytecode<Long> BYTECODE = new Bytecode<>();
 
     static {
-        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 10));
+        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, 20));
     }
 
     SimpleLocalParallelTest() {