You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/08 15:54:17 UTC

[tinkerpop] branch tp4 updated: got ParallelFlowable really nice in rxJava. Had to do some thread safe work on TraverserSet. Having a weird socket issue where tests are closing and starting too fast before socket can be fully closed. Don't know why this creeped up.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 51f8eb9  got ParallelFlowable really nice in rxJava. Had to do some thread safe work on TraverserSet. Having a weird socket issue where tests are closing and starting too fast before socket can be fully closed. Don't know why this creeped up.
51f8eb9 is described below

commit 51f8eb9fdd906eba9336b7eb4eb4389dcd69e44a
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Mon Apr 8 09:54:06 2019 -0600

    got ParallelFlowable really nice in rxJava. Had to do some thread safe work on TraverserSet. Having a weird socket issue where tests are closing and starting too fast before socket can be fully closed. Don't know why this creeped up.
---
 .../machine/function/branch/RepeatBranch.java      |  4 +-
 .../machine/species/remote/MachineServer.java      |  8 ++--
 .../machine/species/remote/TraverserServer.java    | 16 ++++++--
 .../tinkerpop/machine/traverser/TraverserSet.java  | 32 +++++++++-------
 .../apache/tinkerpop/machine/SimpleTestSuite.java  |  2 +-
 .../machine/processor/rxjava/AbstractRxJava.java   |  3 +-
 .../machine/processor/rxjava/FlatMapFlow.java      |  6 +--
 .../machine/processor/rxjava/ParallelRxJava.java   | 44 +++++++++++-----------
 .../machine/processor/rxjava/RepeatEnd.java        | 32 +++++++++-------
 .../machine/processor/rxjava/RepeatStart.java      | 30 ++++++++-------
 .../machine/processor/rxjava/SerialRxJava.java     | 11 ++++--
 11 files changed, 109 insertions(+), 79 deletions(-)

diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
index eddb43b..378eb6d 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/branch/RepeatBranch.java
@@ -112,8 +112,8 @@ public final class RepeatBranch<C, S> extends AbstractFunction<C> {
     public RepeatBranch<C, S> clone() {
         final RepeatBranch<C, S> clone = (RepeatBranch<C, S>) super.clone();
         clone.repeatCompilation = this.repeatCompilation.clone();
-        clone.emitCompilation = this.emitCompilation.clone();
-        clone.untilCompilation = this.untilCompilation.clone();
+        clone.emitCompilation = null == this.emitCompilation ? null : this.emitCompilation.clone();
+        clone.untilCompilation = null == this.untilCompilation ? null : this.untilCompilation.clone();
         return clone;
     }
 
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
index 8931f40..e041d83 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
@@ -41,7 +41,7 @@ public final class MachineServer implements AutoCloseable {
 
     private final int machineServerPort;
     private ServerSocket machineServerSocket;
-    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
+    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
     private final Machine machine = LocalMachine.open();
 
     public MachineServer(final int machineServerPort) {
@@ -51,6 +51,7 @@ public final class MachineServer implements AutoCloseable {
 
     private void run() {
         try {
+            this.serverAlive.set(Boolean.TRUE);
             this.machineServerSocket = new ServerSocket(this.machineServerPort);
             while (this.serverAlive.get()) {
                 final Socket clientSocket = this.machineServerSocket.accept();
@@ -65,9 +66,10 @@ public final class MachineServer implements AutoCloseable {
     public void close() {
         if (this.serverAlive.get()) {
             try {
-                this.serverAlive.set(Boolean.FALSE);
-                this.machineServerSocket.close();
+                if (null != this.machineServerSocket)
+                    this.machineServerSocket.close();
                 this.machine.close();
+                this.serverAlive.set(Boolean.FALSE);
             } catch (final IOException e) {
                 throw new RuntimeException(e.getMessage(), e);
             }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
index d647f96..4e61fff 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
@@ -38,7 +38,7 @@ public final class TraverserServer<C, S> implements AutoCloseable, Iterator<Trav
     private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
     private final int serverPort;
     private ServerSocket serverSocket;
-    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.TRUE);
+    private AtomicBoolean serverAlive = new AtomicBoolean(Boolean.FALSE);
 
     public TraverserServer(final int serverPort) {
         this.serverPort = serverPort;
@@ -47,6 +47,7 @@ public final class TraverserServer<C, S> implements AutoCloseable, Iterator<Trav
 
     private void run() {
         try {
+            this.serverAlive.set(Boolean.TRUE);
             this.serverSocket = new ServerSocket(this.serverPort);
             while (this.serverAlive.get()) {
                 final Socket clientSocket = this.serverSocket.accept();
@@ -73,15 +74,24 @@ public final class TraverserServer<C, S> implements AutoCloseable, Iterator<Trav
 
     @Override
     public Traverser<C, S> next() {
-        return this.traverserSet.remove();
+        if (!this.traverserSet.isEmpty())
+            return this.traverserSet.remove();
+        else {
+            while (this.serverAlive.get()) {
+                if (!this.traverserSet.isEmpty())
+                    return this.traverserSet.remove();
+            }
+            return this.traverserSet.remove();
+        }
     }
 
     @Override
     public synchronized void close() {
         if (this.serverAlive.get()) {
             try {
+                if (null != this.serverSocket)
+                    this.serverSocket.close();
                 this.serverAlive.set(Boolean.FALSE);
-                this.serverSocket.close();
             } catch (final IOException e) {
                 throw new RuntimeException(e.getMessage(), e);
             }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
index 8c40dbb..7fdbf86 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
@@ -76,13 +76,15 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple
 
     @Override
     public boolean add(final Traverser<C, S> traverser) {
-        final Traverser<C, S> existing = this.map.get(traverser);
-        if (null == existing) {
-            this.map.put(traverser, traverser);
-            return true;
-        } else {
-            existing.coefficient().sum(traverser.coefficient());
-            return false;
+        synchronized (this.map) {
+            final Traverser<C, S> existing = this.map.get(traverser);
+            if (null == existing) {
+                this.map.put(traverser, traverser);
+                return true;
+            } else {
+                existing.coefficient().sum(traverser.coefficient());
+                return false;
+            }
         }
     }
 
@@ -93,12 +95,14 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple
 
     @Override
     public Traverser<C, S> remove() {  // pop, exception if empty
-        final Iterator<Traverser<C, S>> iterator = this.map.values().iterator();
-        if (!iterator.hasNext())
-            throw FastNoSuchElementException.instance();
-        final Traverser<C, S> next = iterator.next();
-        iterator.remove();
-        return next;
+        synchronized (this.map) {
+            final Iterator<Traverser<C, S>> iterator = this.map.values().iterator();
+            if (!iterator.hasNext())
+                throw FastNoSuchElementException.instance();
+            final Traverser<C, S> next = iterator.next();
+            iterator.remove();
+            return next;
+        }
     }
 
     @Override
@@ -136,7 +140,7 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple
         return this.map.values().toString();
     }
 
-    public void sort(final Comparator<Traverser<C,S>> comparator) {
+    public void sort(final Comparator<Traverser<C, S>> comparator) {
         final List<Traverser<C, S>> list = new ArrayList<>(this.map.size());
         IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add);
         Collections.sort(list, comparator);
diff --git a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
index b53577b..31177e4 100644
--- a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
+++ b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
@@ -235,7 +235,7 @@ public class SimpleTestSuite extends AbstractTestSuite<Long> {
 
     @Test
     void g_injectXlistX1_2_3XX_unfold_incr() {
-        verifyOrder(List.of(2L, 3L, 4L),
+        verify(List.of(2L, 3L, 4L),
                 g.inject(List.of(1L, 2L, 3L)).unfold().incr());
     }
 
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 9d05fd2..4da0657 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -32,8 +32,8 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
 
     static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic configuration
 
-    final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
     boolean executed = false;
+    final AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE);
     final TraverserSet<C, S> starts = new TraverserSet<>();
     final TraverserSet<C, E> ends = new TraverserSet<>();
     final Compilation<C, S, E> compilation;
@@ -64,6 +64,7 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
         this.starts.clear();
         this.ends.clear();
         this.executed = false;
+        this.alive.set(Boolean.FALSE);
     }
 
     protected abstract void prepareFlow();
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
index 0a57f0d..2d1cd9b 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> {
 
-    private FlatMapFunction<C, S, E> function;
+    private ThreadLocal<FlatMapFunction<C, S, E>> function;
 
     FlatMapFlow(final FlatMapFunction<C, S, E> function) {
-        this.function = function;
+        this.function = ThreadLocal.withInitial(() -> (FlatMapFunction) function.clone());
     }
 
     @Override
     public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
-        return () -> traverser.flatMap(this.function);
+        return () -> traverser.flatMap(this.function.get());
     }
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index a55ae4c..75bb398 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -49,6 +49,7 @@ import java.util.concurrent.Executors;
 public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
     private final int threads;
+    private ExecutorService threadPool;
 
     ParallelRxJava(final Compilation<C, S, E> compilation, final int threads) {
         super(compilation);
@@ -58,43 +59,46 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     @Override
     protected void prepareFlow() {
         if (!this.executed) {
-            ExecutorService threadPool = Executors.newFixedThreadPool(this.threads);
             this.executed = true;
-            ParallelRxJava.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(threadPool)), this.compilation).
+            this.alive.set(Boolean.TRUE);
+            this.threadPool = Executors.newFixedThreadPool(this.threads);
+            this.compile(
+                    ParallelFlowable.from(Flowable.fromIterable(this.starts)).
+                            runOn(Schedulers.from(this.threadPool)), this.compilation).
                     doOnNext(this.ends::add).
                     sequential().
                     doOnComplete(() -> this.alive.set(Boolean.FALSE)).
-                    doFinally(threadPool::shutdown).
-                    blockingSubscribe();
+                    doFinally(this.threadPool::shutdown).
+                    blockingSubscribe(); // thread this so results can be received before computation completes
         }
     }
 
     // EXECUTION PLAN COMPILER
 
-    private static <C, S, E> ParallelFlowable<Traverser<C, E>> compile(final ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) {
+    private ParallelFlowable<Traverser<C, E>> compile(final ParallelFlowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) {
         final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory();
         ParallelFlowable<Traverser<C, E>> sink = (ParallelFlowable) source;
         for (final CFunction<C> function : compilation.getFunctions()) {
-            sink = ParallelRxJava.extend(sink, function, traverserFactory);
+            sink = this.extend((ParallelFlowable) sink, function, traverserFactory);
         }
         return sink;
     }
 
-    private static <C, S, E, B> ParallelFlowable<Traverser<C, E>> extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) {
+    private <B> ParallelFlowable<Traverser<C, E>> extend(ParallelFlowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) {
         if (function instanceof MapFunction)
             return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
         else if (function instanceof FilterFunction) {
             return (ParallelFlowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function));
         } else if (function instanceof FlatMapFunction) {
-            return flow.sequential().flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel();
+            return flow.sequential().flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof InitialFunction) {
-            return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))).parallel();
+            return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof ReduceFunction) {
             final ReduceFunction<C, S, E> reduceFunction = (ReduceFunction<C, S, E>) function;
-            return flow.sequential().reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new Reducer<>(reduceFunction)).toFlowable().parallel();
+            return flow.sequential().reduce(traverserFactory.create(reduceFunction, reduceFunction.getInitialValue()), new Reducer<>(reduceFunction)).toFlowable().parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof BarrierFunction) {
             final BarrierFunction<C, S, E, B> barrierFunction = (BarrierFunction<C, S, E, B>) function;
-            return flow.sequential().reduce(barrierFunction.getInitialValue(), new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new BarrierFlow<>(barrierFunction, traverserFactory)).parallel();
+            return flow.sequential().reduce(barrierFunction.getInitialValue(), new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new BarrierFlow<>(barrierFunction, traverserFactory)).parallel(1); // order requires serial
         } else if (function instanceof BranchFunction) {
             final ParallelFlowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function));
             final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>();
@@ -103,31 +107,29 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
                 final int branchId = null == branches.getKey() ? -1 : branchCounter;
                 branchCounter++;
                 for (final Compilation<C, S, E> branch : branches.getValue()) {
-                    branchFlows.add(compile(selectorFlow.
+                    branchFlows.add(this.compile(selectorFlow.
                                     filter(list -> list.get(0).equals(branchId)).
                                     map(list -> (Traverser<C, S>) list.get(1)),
                             branch).sequential());
                 }
             }
-            return PublishProcessor.merge(branchFlows).parallel();
+            return PublishProcessor.merge(branchFlows).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof RepeatBranch) {
             final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) function;
             final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>();
             ParallelFlowable<List> selectorFlow;
             for (int i = 0; i < MAX_REPETITIONS; i++) {
                 if (repeatBranch.hasStartPredicates()) {
-                    selectorFlow = flow.sequential().flatMapIterable(new RepeatStart<>(repeatBranch)).parallel();
+                    selectorFlow = flow.sequential().flatMapIterable(new RepeatStart<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
                     outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
-                    flow = compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), repeatBranch.getRepeat());
-                } else {
-                    flow = compile(flow, repeatBranch.getRepeat());
-                }
-                selectorFlow = flow.sequential().flatMapIterable(new RepeatEnd<>(repeatBranch)).parallel();
+                    flow = this.compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), (Compilation) repeatBranch.getRepeat());
+                } else
+                    flow = this.compile(flow, (Compilation) repeatBranch.getRepeat());
+                selectorFlow = flow.sequential().flatMapIterable(new RepeatEnd<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
                 outputs.add(selectorFlow.sequential().filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                 flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
             }
-
-            return (ParallelFlowable) PublishProcessor.merge(outputs).parallel();
+            return (ParallelFlowable) PublishProcessor.merge(outputs).parallel().runOn(Schedulers.from(this.threadPool));
         }
         throw new RuntimeException("Need a new execution plan step: " + function);
     }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
index 9086f32..0b19111 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatEnd.java
@@ -30,30 +30,30 @@ import java.util.List;
  */
 public final class RepeatEnd<C, S> implements Function<Traverser<C, S>, List<List>> {
 
-    private final RepeatBranch<C, S> repeatBranch;
+    private final ThreadLocal<RepeatBranch<C, S>> repeatBranch;
 
     RepeatEnd(final RepeatBranch<C, S> repeatBranch) {
-        this.repeatBranch = repeatBranch;
+        this.repeatBranch = ThreadLocal.withInitial(repeatBranch::clone);
     }
 
     @Override
     public List<List> apply(final Traverser<C, S> traverser) {
-        final Traverser<C,S> t = traverser.repeatLoop(this.repeatBranch);
+        final Traverser<C, S> t = traverser.repeatLoop(this.getRepeatBranch());
         final List<List> list = new ArrayList<>();
-        if (this.repeatBranch.hasEndPredicates()) {
-            if (3 == this.repeatBranch.getUntilLocation()) {
-                if (this.repeatBranch.getUntil().filterTraverser(t)) {
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
-                } else if (4 == this.repeatBranch.getEmitLocation() && this.repeatBranch.getEmit().filterTraverser(t)) {
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+        if (this.repeatBranch.get().hasEndPredicates()) {
+            if (3 == this.getRepeatBranch().getUntilLocation()) {
+                if (this.getRepeatBranch().getUntil().filterTraverser(t)) {
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
+                } else if (4 == this.getRepeatBranch().getEmitLocation() && this.getRepeatBranch().getEmit().filterTraverser(t)) {
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
                     list.add(List.of(1, t));
                 } else
                     list.add(List.of(1, t));
-            } else if (3 == this.repeatBranch.getEmitLocation()) {
-                if (this.repeatBranch.getEmit().filterTraverser(t))
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
-                if (4 == this.repeatBranch.getUntilLocation() && this.repeatBranch.getUntil().filterTraverser(t))
-                    list.add(List.of(0, t.repeatDone(this.repeatBranch)));
+            } else if (3 == this.getRepeatBranch().getEmitLocation()) {
+                if (this.getRepeatBranch().getEmit().filterTraverser(t))
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
+                if (4 == this.getRepeatBranch().getUntilLocation() && this.getRepeatBranch().getUntil().filterTraverser(t))
+                    list.add(List.of(0, t.repeatDone(this.getRepeatBranch())));
                 else
                     list.add(List.of(1, t));
             }
@@ -61,4 +61,8 @@ public final class RepeatEnd<C, S> implements Function<Traverser<C, S>, List<Lis
             list.add(List.of(1, t));
         return list;
     }
+
+    private RepeatBranch<C, S> getRepeatBranch() {
+        return this.repeatBranch.get();
+    }
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
index 5dea785..0620efa 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RepeatStart.java
@@ -30,29 +30,29 @@ import java.util.List;
  */
 public final class RepeatStart<C, S> implements Function<Traverser<C, S>, List<List>> {
 
-    private final RepeatBranch<C, S> repeatBranch;
+    private final ThreadLocal<RepeatBranch<C, S>> repeatBranch;
 
     RepeatStart(final RepeatBranch<C, S> repeatBranch) {
-        this.repeatBranch = repeatBranch;
+        this.repeatBranch = ThreadLocal.withInitial(repeatBranch::clone);
     }
 
     @Override
     public List<List> apply(final Traverser<C, S> traverser) {
         final List<List> list = new ArrayList<>();
-        if (this.repeatBranch.hasStartPredicates()) {
-            if (1 == this.repeatBranch.getUntilLocation()) {
-                if (this.repeatBranch.getUntil().filterTraverser(traverser)) {
-                    list.add(List.of(0, traverser.repeatDone(this.repeatBranch)));
-                } else if (2 == this.repeatBranch.getEmitLocation() && this.repeatBranch.getEmit().filterTraverser(traverser)) {
+        if (this.getRepeatBranch().hasStartPredicates()) {
+            if (1 == this.getRepeatBranch().getUntilLocation()) {
+                if (this.getRepeatBranch().getUntil().filterTraverser(traverser)) {
+                    list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch())));
+                } else if (2 == this.getRepeatBranch().getEmitLocation() && this.getRepeatBranch().getEmit().filterTraverser(traverser)) {
                     list.add(List.of(1, traverser));
-                    list.add(List.of(0, traverser.repeatDone(this.repeatBranch)));
+                    list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch())));
                 } else
                     list.add(List.of(1, traverser));
-            } else if (1 == this.repeatBranch.getEmitLocation()) {
-                if (this.repeatBranch.getEmit().filterTraverser(traverser))
-                    list.add(List.of(0, traverser.repeatDone(this.repeatBranch)));
-                if (2 == this.repeatBranch.getUntilLocation() && this.repeatBranch.getUntil().filterTraverser(traverser)) {
-                    list.add(List.of(0, traverser.repeatDone(this.repeatBranch)));
+            } else if (1 == this.getRepeatBranch().getEmitLocation()) {
+                if (this.getRepeatBranch().getEmit().filterTraverser(traverser))
+                    list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch())));
+                if (2 == this.getRepeatBranch().getUntilLocation() && this.getRepeatBranch().getUntil().filterTraverser(traverser)) {
+                    list.add(List.of(0, traverser.repeatDone(this.getRepeatBranch())));
                 } else
                     list.add(List.of(1, traverser));
             }
@@ -60,4 +60,8 @@ public final class RepeatStart<C, S> implements Function<Traverser<C, S>, List<L
             list.add(List.of(1, traverser));
         return list;
     }
+
+    private RepeatBranch<C, S> getRepeatBranch() {
+        return this.repeatBranch.get();
+    }
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 40fc11e..5331ee3 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -44,7 +44,7 @@ import java.util.Map;
  */
 public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
-    public SerialRxJava(final Compilation<C, S, E> compilation) {
+    SerialRxJava(final Compilation<C, S, E> compilation) {
         super(compilation);
     }
 
@@ -52,10 +52,14 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
+            this.alive.set(Boolean.TRUE);
             SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation).
                     doOnNext(this.ends::add).
                     doOnComplete(() -> this.alive.set(Boolean.FALSE)).
-                    blockingSubscribe();
+                    subscribe();
+        }
+        while (this.alive.get() && this.ends.isEmpty()) {
+            // only return if there is a result ready from the flow (or the flow is dead)
         }
     }
 
@@ -109,9 +113,8 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
                     selectorFlow = flow.flatMapIterable(new RepeatStart<>(repeatBranch));
                     outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                     flow = compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), repeatBranch.getRepeat());
-                } else {
+                } else
                     flow = compile(flow, repeatBranch.getRepeat());
-                }
                 selectorFlow = flow.flatMapIterable(new RepeatEnd<>(repeatBranch));
                 outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                 flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));