You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/23 10:38:29 UTC

[tinkerpop] branch tp4 updated: Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next().

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 2cf7cdf  Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next().
2cf7cdf is described below

commit 2cf7cdfed05234a22b29a124c6617fb45bf5e7de
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 23 04:38:15 2019 -0600

    Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next().
---
 .../machine/bytecode/compiler/Compilation.java     | 39 +++++++------
 .../machine/processor/EmptyProcessor.java          | 16 ++++--
 .../machine/processor/FilterProcessor.java         |  7 ++-
 .../machine/processor/LoopsProcessor.java          |  5 +-
 .../tinkerpop/machine/processor/Processor.java     | 45 +++++++++++++--
 .../machine/processor/SimpleProcessor.java         | 44 +++++++++++----
 .../tinkerpop/machine/species/BasicMachine.java    |  3 +-
 .../tinkerpop/machine/species/LocalMachine.java    |  5 +-
 .../tinkerpop/machine/util/IteratorUtils.java      | 35 +++++++++++-
 .../tinkerpop/machine/processor/beam/Beam.java     | 43 ++++++++++----
 .../machine/processor/pipes/BranchStep.java        |  4 +-
 .../tinkerpop/machine/processor/pipes/Pipes.java   | 45 +++++++++++----
 .../machine/processor/pipes/RepeatStep.java        | 22 +++++---
 .../machine/processor/rxjava/AbstractRxJava.java   | 65 ++++++++++++++++------
 .../machine/processor/rxjava/ParallelRxJava.java   | 31 +++++------
 .../machine/processor/rxjava/SerialRxJava.java     | 15 +++--
 16 files changed, 305 insertions(+), 119 deletions(-)

diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
index 32a04e4..f7105d9 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.machine.structure.EmptyStructure;
 import org.apache.tinkerpop.machine.structure.StructureFactory;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -75,53 +76,51 @@ public final class Compilation<C, S, E> implements Serializable, Cloneable {
     }
 
     public Processor<C, S, E> getProcessor() {
-        if (null == this.processor)
-            this.processor = this.processorFactory.mint(this);
+        this.prepareProcessor();
         return this.processor;
     }
 
     private void prepareProcessor() {
         if (null == this.processor)
             this.processor = this.processorFactory.mint(this);
-        else
-            this.processor.reset();
     }
 
-    private Traverser<C, S> prepareTraverser(final Traverser<C, S> traverser) {
+    private Iterator<Traverser<C, S>> prepareTraverser(final Traverser<C, S> traverser) {
         final Traverser<C, S> clone = traverser.clone();
         clone.coefficient().unity();
-        return clone;
+        return IteratorUtils.of(clone);
     }
 
     public Traverser<C, E> mapTraverser(final Traverser<C, S> traverser) {
         this.prepareProcessor();
-        this.processor.addStart(this.prepareTraverser(traverser));
-        if (!this.processor.hasNext())
+        final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(traverser));
+        if (!iterator.hasNext())
             throw new RuntimeException("The nested traversal is not a map function: " + this);
-        return this.processor.next();
+        final Traverser<C, E> result = iterator.next();
+        this.processor.stop();
+        return result;
     }
 
     public Traverser<C, E> mapObject(final S object) {
         this.prepareProcessor();
-        this.processor.addStart(this.traverserFactory.create(this.functions.get(0), object));
-        return this.processor.next();
+        final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(this.traverserFactory.create(this.functions.get(0), object)));
+        final Traverser<C, E> result = iterator.next();
+        this.processor.stop();
+        return result;
     }
 
     public Iterator<Traverser<C, E>> flatMapTraverser(final Traverser<C, S> traverser) {
         this.prepareProcessor();
-        this.processor.addStart(this.prepareTraverser(traverser));
-        return this.processor;
+        final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(traverser));
+        return IteratorUtils.onLast(iterator, () -> processor.stop());
     }
 
     public boolean filterTraverser(final Traverser<C, S> traverser) {
         this.prepareProcessor();
-        this.processor.addStart(this.prepareTraverser(traverser));
-        return this.processor.hasNext();
-    }
-
-    public Processor<C, S, E> addTraverser(final Traverser<C, S> traverser) {
-        this.getProcessor().addStart(traverser);
-        return this.processor;
+        final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(traverser));
+        final boolean hasNext = iterator.hasNext();
+        this.processor.stop();
+        return hasNext;
     }
 
     public List<CFunction<C>> getFunctions() {
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java
index 127f028..d3da9cb 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java
@@ -22,6 +22,10 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.util.FastNoSuchElementException;
 
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Consumer;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -34,22 +38,22 @@ public final class EmptyProcessor<C, S, E> implements Processor<C, S, E>, Proces
     }
 
     @Override
-    public void addStart(final Traverser<C, S> traverser) {
+    public void stop() {
 
     }
 
     @Override
-    public Traverser<C, E> next() {
-        throw FastNoSuchElementException.instance();
+    public boolean isRunning() {
+        return false;
     }
 
     @Override
-    public boolean hasNext() {
-        return false;
+    public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C,S>> starts) {
+        return Collections.emptyIterator();
     }
 
     @Override
-    public void reset() {
+    public void subscribe(final Iterator<Traverser<C,S>> starts,  final Consumer<Traverser<C, E>> consumer) {
 
     }
 
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java
index 1a23279..a83e0d9 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.machine.processor;
 
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.util.Iterator;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -32,9 +34,8 @@ public final class FilterProcessor<C, S> extends SimpleProcessor<C, S, S> {
     }
 
     @Override
-    public void addStart(final Traverser<C, S> traverser) {
+    protected void processTraverser(Iterator<Traverser<C, S>> starts) {
         if (this.allow)
-            this.traverser = traverser;
+            this.traverser = starts.next();
     }
-
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java
index d367356..25a44e1 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.machine.processor;
 
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.util.Iterator;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -32,7 +34,8 @@ public final class LoopsProcessor<C, S> extends SimpleProcessor<C, S, S> {
     }
 
     @Override
-    public void addStart(final Traverser<C, S> traverser) {
+    protected void processTraverser(final Iterator<Traverser<C, S>> starts) {
+        final Traverser<C, S> traverser = starts.next();
         this.traverser = traverser.loops() == this.loops ? traverser : null;
     }
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
index 282d1a5..6196de4 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
@@ -19,19 +19,54 @@
 package org.apache.tinkerpop.machine.processor;
 
 import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
 
 import java.util.Iterator;
+import java.util.function.Consumer;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public interface Processor<C, S, E> extends Iterator<Traverser<C, E>> {
+public interface Processor<C, S, E> {
 
-    public void addStart(final Traverser<C, S> traverser);
+    /**
+     * A processor is started using {@link Processor#iterator(Iterator)} or {@link Processor#subscribe(Iterator, Consumer)}.
+     * When the iterator is empty or the subscriber has no more traversers to consume, then the process is no longer running.
+     *
+     * @return whether the processor is running
+     */
+    public boolean isRunning();
 
-    public Traverser<C, E> next();
+    /**
+     * When a processor is stopped, subscriptions and iteration are halted.
+     */
+    public void stop();
 
-    public boolean hasNext();
+    /**
+     * Start the processor and return a pull-based iterator.
+     * If pull-based iteration is used, then push-based subscription can not be used while the processor is running.
+     *
+     * @return an iterator of traverser results
+     */
+    public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts);
+
+    public default Iterator<Traverser<C, E>> iterator(final Traverser<C, S> start) {
+        return this.iterator(IteratorUtils.of(start));
+    }
+
+    /**
+     * Start the processor and process the resultant traversers using the push-based consumer.
+     * If push-based subscription is used, then pull-based iteration can not be used while the processor is running.
+     *
+     * @param consumer a consumer of traversers results
+     */
+    public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer);
+
+    public class Exceptions {
+
+        public static IllegalStateException processorIsCurrentlyRunning(final Processor processor) {
+            return new IllegalStateException("The processor can not be started because it is currently running: " + processor);
+        }
+    }
 
-    public void reset();
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java
index 1052891..e2a0955 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java
@@ -22,6 +22,9 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.util.FastNoSuchElementException;
 
+import java.util.Iterator;
+import java.util.function.Consumer;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -30,23 +33,42 @@ public abstract class SimpleProcessor<C, S, E> implements Processor<C, S, E>, Pr
     protected Traverser<C, E> traverser = null;
 
     @Override
-    public Traverser<C, E> next() {
-        if (null == this.traverser)
-            throw FastNoSuchElementException.instance();
-        else {
-            final Traverser<C, E> temp = this.traverser;
-            this.traverser = null;
-            return temp;
-        }
+    public void stop() {
+        this.traverser = null;
     }
 
     @Override
-    public boolean hasNext() {
+    public boolean isRunning() {
         return null != this.traverser;
     }
 
     @Override
-    public void reset() {
+    public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
+        this.processTraverser(starts);
+        return new Iterator<>() {
+            @Override
+            public boolean hasNext() {
+                return null != traverser;
+            }
+
+            @Override
+            public Traverser<C, E> next() {
+                if (null == traverser)
+                    throw FastNoSuchElementException.instance();
+                else {
+                    final Traverser<C, E> temp = traverser;
+                    traverser = null;
+                    return temp;
+                }
+            }
+        };
+    }
+
+    @Override
+    public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+        this.processTraverser(starts);
+        if (null != this.traverser)
+            consumer.accept(this.traverser);
         this.traverser = null;
     }
 
@@ -54,4 +76,6 @@ public abstract class SimpleProcessor<C, S, E> implements Processor<C, S, E>, Pr
     public <D, T, F> Processor<D, T, F> mint(final Compilation<D, T, F> compilation) {
         return (Processor<D, T, F>) this;
     }
+
+    protected abstract void processTraverser(final Iterator<Traverser<C, S>> starts);
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
index 5dbe4aa..386353a 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.util.Collections;
 import java.util.Iterator;
 
 /**
@@ -46,7 +47,7 @@ public final class BasicMachine implements Machine {
 
     @Override
     public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode) {
-        return Compilation.<C, Object, E>compile(bytecode).getProcessor();
+        return Compilation.<C, Object, E>compile(bytecode).getProcessor().iterator(Collections.emptyIterator());
     }
 
     public static Machine open() {
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
index cf3e221..e1b9126 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.bytecode.compiler.SourceCompilation;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
@@ -76,8 +77,8 @@ public final class LocalMachine implements Machine {
         final UUID sourceId = LocalMachine.getSourceId(bytecode).orElse(null);
         final SourceCompilation<C> source = (SourceCompilation<C>) this.sources.get(sourceId);
         return null == source ?
-                Compilation.<C, Object, E>compile(bytecode).getProcessor() :
-                Compilation.<C, Object, E>compile(source, bytecode).getProcessor();
+                Compilation.<C, Object, E>compile(bytecode).getProcessor().iterator(Collections.emptyIterator()) :
+                Compilation.<C, Object, E>compile(source, bytecode).getProcessor().iterator(Collections.emptyIterator());
     }
 
     @Override
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java
index 439f635..d1bf2df 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java
@@ -75,7 +75,40 @@ public final class IteratorUtils {
         return ix;
     }
 
-    public static final long count(final Iterable iterable) {
+    public static <S> Iterator<S> onLast(final Iterator<S> iterator, final Runnable onLast) {
+        return new Iterator<>() {
+            boolean lastExecuted = false;
+
+            @Override
+            public boolean hasNext() {
+                final boolean hasNext = iterator.hasNext();
+                if (!hasNext && !this.lastExecuted) {
+                    this.lastExecuted = true;
+                    onLast.run();
+                }
+                return hasNext;
+            }
+
+            @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
+            public S next() {
+                try {
+                    return iterator.next();
+                } finally {
+                    if (!iterator.hasNext() && !this.lastExecuted) {
+                        this.lastExecuted = true;
+                        onLast.run();
+                    }
+                }
+            }
+        };
+    }
+
+    public static long count(final Iterable iterable) {
         return IteratorUtils.count(iterable.iterator());
     }
 
diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index ee235a7..20007c4 100644
--- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -50,12 +50,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -85,25 +87,46 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
     }
 
     @Override
-    public void addStart(final Traverser<C, S> traverser) {
-        // TODO: use side-inputs
+    public void stop() {
+        try {
+            if (null != this.pipelineResult)
+                this.pipelineResult.cancel();
+        } catch (final IOException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+        this.pipelineResult = null;
+        this.iterator = null;
     }
 
     @Override
-    public Traverser<C, E> next() {
-        this.setupPipeline();
-        return this.iterator.next();
+    public boolean isRunning() {
+        return null != this.pipelineResult && !this.pipelineResult.getState().isTerminal();
     }
 
     @Override
-    public boolean hasNext() {
-        this.setupPipeline();
-        return this.iterator.hasNext();
+    public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) { // TODO: use side-inputs for starts
+        if (this.isRunning())
+            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+        return new Iterator<>() {
+            @Override
+            public boolean hasNext() {
+                setupPipeline();
+                return iterator.hasNext();
+            }
+
+            @Override
+            public Traverser<C, E> next() {
+                setupPipeline();
+                return iterator.next();
+            }
+        };
     }
 
     @Override
-    public void reset() {
-        this.iterator = null;
+    public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) { // TODO: use side-inputs for starts
+        if (this.isRunning())
+            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+        // TODO: create an IteratorOutputFn and a ConsumerOutputFn and connect in setupPipeline()
     }
 
     @Override
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
index 667af99..75d66a6 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
@@ -66,13 +66,13 @@ final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> {
                 if (entry.getKey().filterTraverser(traverser)) {
                     found = true;
                     for (final Compilation<C, S, E> branch : entry.getValue()) {
-                        ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(branch.addTraverser(traverser.clone()));
+                        ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(branch.getProcessor().iterator(traverser.clone()));
                     }
                 }
             }
             if (!found) {
                 for (final Compilation<C, S, E> defaultBranch : this.defaultBranches) {
-                    ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(defaultBranch.addTraverser(traverser.clone()));
+                    ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(defaultBranch.getProcessor().iterator(traverser.clone()));
                 }
             }
         }
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index e370c58..9f3c7c8 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -31,9 +31,13 @@ import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.processor.pipes.util.InMemoryReducer;
 import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -43,6 +47,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> {
     private final List<Step<?, ?, ?>> steps = new ArrayList<>();
     private Step<C, ?, E> endStep;
     private SourceStep<C, S> startStep;
+    private AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE);
 
     public Pipes(final Compilation<C, S, E> compilation) {
         Step<C, ?, S> previousStep = EmptyStep.instance();
@@ -81,25 +86,45 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> {
     }
 
     @Override
-    public void addStart(final Traverser<C, S> traverser) {
-        this.startStep.addStart(traverser);
+    public void stop() {
+        this.alive.set(Boolean.FALSE);
+        for (final Step<?, ?, ?> step : this.steps) {
+            step.reset();
+        }
     }
 
     @Override
-    public Traverser<C, E> next() {
-        return this.endStep.next();
+    public boolean isRunning() {
+        return this.alive.get();
     }
 
     @Override
-    public boolean hasNext() {
-        return this.endStep.hasNext();
+    public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
+
+        if (this.isRunning())
+            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+        this.alive.set(Boolean.TRUE);
+        if (null != this.startStep)
+            starts.forEachRemaining(this.startStep::addStart);
+        return IteratorUtils.onLast(this.endStep, () -> this.alive.set(Boolean.FALSE));
     }
 
+
     @Override
-    public void reset() {
-        for (final Step<?, ?, ?> step : this.steps) {
-            step.reset();
-        }
+    public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+        if (this.isRunning())
+            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+        this.alive.set(Boolean.TRUE);
+        new Thread(() -> {
+            final Iterator<Traverser<C, E>> iterator = this.iterator(starts);
+            while (iterator.hasNext()) {
+                if (!this.alive.get())
+                    break;
+                consumer.accept(iterator.next());
+            }
+        }).start();
     }
 
     @Override
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java
index 8dc25f3..4c5fa5d 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java
@@ -22,6 +22,10 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
+
+import java.util.Collections;
+import java.util.Iterator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -34,6 +38,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
     private final Compilation<C, S, ?> untilCompilation;
     private final Compilation<C, S, ?> emitCompilation;
     private final Compilation<C, S, S> repeat;
+    private Iterator<Traverser<C, S>> repeatIterator = Collections.emptyIterator();
+    private TraverserSet<C, S> repeatStarts = new TraverserSet<>();
     private TraverserSet<C, S> outputTraversers = new TraverserSet<>();
     private TraverserSet<C, S> inputTraversers = new TraverserSet<>();
     private final boolean hasStartPredicates;
@@ -72,19 +78,19 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
                         this.outputTraversers.add(traverser);
                     } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) {
                         this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
-                        this.repeat.addTraverser(traverser);
+                        this.repeatStarts.add(traverser);
                     } else
-                        this.repeat.addTraverser(traverser);
+                        this.repeatStarts.add(traverser);
                 } else if (1 == this.emitLocation) {
                     if (this.emitCompilation.filterTraverser(traverser))
                         this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
                     if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser))
                         this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
                     else
-                        this.repeat.addTraverser(traverser);
+                        this.repeatStarts.add(traverser);
                 }
             } else {
-                this.repeat.addTraverser(traverser);
+                this.repeatStarts.add(traverser);
             }
             return true;
         }
@@ -92,9 +98,9 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
     }
 
     private void stageOutput() {
-        while (this.outputTraversers.isEmpty() && (this.repeat.getProcessor().hasNext() || this.stageInput())) {
-            if (this.repeat.getProcessor().hasNext()) {
-                final Traverser<C, S> traverser = this.repeat.getProcessor().next().repeatLoop(this.repeatBranch);
+        while (this.outputTraversers.isEmpty() && (this.repeatIterator.hasNext() || this.stageInput())) {
+            if (this.repeatIterator.hasNext()) {
+                final Traverser<C, S> traverser = this.repeatIterator.next().repeatLoop(this.repeatBranch);
                 if (this.hasEndPredicates) {
                     if (3 == this.untilLocation) {
                         if (this.untilCompilation.filterTraverser(traverser)) {
@@ -115,6 +121,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
                 } else {
                     this.inputTraversers.add(traverser);
                 }
+            } else {
+                this.repeatIterator = this.repeat.getProcessor().iterator(IteratorUtils.removeOnNext(this.repeatStarts.iterator()));
             }
         }
     }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 6c478f4..d16a4e1 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -24,6 +24,10 @@ import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
 
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -31,10 +35,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
 
     static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic configuration
 
-    boolean executed = false;
     Disposable disposable;
+    final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE);
     final TraverserSet<C, S> starts = new TraverserSet<>();
-    final TraverserSet<C, E> ends = new TraverserSet<>();
+    private final TraverserSet<C, E> ends = new TraverserSet<>();
     final Compilation<C, S, E> compilation;
 
     AbstractRxJava(final Compilation<C, S, E> compilation) {
@@ -42,35 +46,62 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
     }
 
     @Override
-    public void addStart(final Traverser<C, S> traverser) {
-        this.starts.add(traverser);
+    public void stop() {
+        if (null != this.disposable) {
+            this.disposable.dispose();
+            this.disposable = null;
+        }
+        this.starts.clear();
+        this.ends.clear();
+        this.running.set(Boolean.FALSE);
     }
 
     @Override
-    public Traverser<C, E> next() {
-        this.prepareFlow();
-        return this.ends.remove();
+    public boolean isRunning() {
+        return this.running.get();
     }
 
     @Override
-    public boolean hasNext() {
-        this.prepareFlow();
-        return !this.ends.isEmpty();
+    public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
+        if (this.isRunning())
+            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+        this.running.set(Boolean.TRUE);
+        this.starts.clear();
+        this.ends.clear();
+        starts.forEachRemaining(this.starts::add);
+        this.prepareFlow(this.ends::add);
+        return new Iterator<>() {
+            @Override
+            public boolean hasNext() {
+                waitForCompletionOrResult();
+                return !ends.isEmpty();
+            }
+
+            @Override
+            public Traverser<C, E> next() {
+                waitForCompletionOrResult();
+                return ends.remove();
+            }
+        };
     }
 
     @Override
-    public void reset() {
-        if (null != this.disposable)
-            this.disposable.dispose();
+    public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+        if (this.isRunning())
+            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+        this.running.set(Boolean.TRUE);
         this.starts.clear();
         this.ends.clear();
-        this.executed = false;
+        starts.forEachRemaining(this.starts::add);
+        this.prepareFlow(consumer::accept);
     }
 
-    protected abstract void prepareFlow();
+    protected abstract void prepareFlow(final io.reactivex.functions.Consumer<? super Traverser<C, E>> consumer);
 
-    void waitForCompletionOrResult() {
-        while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
+    private void waitForCompletionOrResult() {
+        while (this.ends.isEmpty() && this.isRunning()) {
             // wait until either the flow is complete or there is a traverser result
         }
     }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 085168a..b0d92b1 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.machine.processor.rxjava;
 
 import io.reactivex.Flowable;
+import io.reactivex.functions.Consumer;
 import io.reactivex.parallel.ParallelFlowable;
 import io.reactivex.processors.PublishProcessor;
 import io.reactivex.schedulers.Schedulers;
@@ -33,6 +34,7 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 import org.apache.tinkerpop.machine.util.IteratorUtils;
@@ -63,22 +65,19 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     }
 
     @Override
-    protected void prepareFlow() {
-        if (!this.executed) {
-            this.executed = true;
-            this.disposable = this.flowable.
-                    doOnNext(this.ends::add).
-                    sequential().
-                    doFinally(() -> {
-                        if (null != this.bytecodeId) { // only the parent compilation should close the thread pool
-                            this.threadPool.shutdown();
-                            RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
-                        }
-                    }).
-                    subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time
-
-        }
-        this.waitForCompletionOrResult();
+    protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+        this.disposable = this.flowable
+                .doOnNext(consumer)
+                .sequential()
+                .subscribeOn(Schedulers.newThread()) // don't block the execution so results can be streamed back in real-time
+                .doFinally(() -> {
+                    if (null != this.bytecodeId) { // only the parent compilation should close the thread pool
+                        RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
+                        this.threadPool.shutdown();
+                    }
+                    this.running.set(Boolean.FALSE);
+                })
+                .subscribe();
     }
 
     // EXECUTION PLAN COMPILER
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index ee04fb3..0d4cbce 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.machine.processor.rxjava;
 
 import io.reactivex.Flowable;
+import io.reactivex.functions.Consumer;
 import io.reactivex.processors.PublishProcessor;
 import io.reactivex.schedulers.Schedulers;
 import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
@@ -31,6 +32,7 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 import org.apache.tinkerpop.machine.util.IteratorUtils;
@@ -54,14 +56,11 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     }
 
     @Override
-    protected void prepareFlow() {
-        if (!this.executed) {
-            this.executed = true;
-            this.disposable = this.flowable.
-                    doOnNext(this.ends::add).
-                    subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time
-        }
-        this.waitForCompletionOrResult();
+    protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+        this.disposable = this.flowable
+                .subscribeOn(Schedulers.newThread())  // don't block the execution so results can be streamed back in real-time
+                .doFinally(() -> this.running.set(Boolean.FALSE))
+                .subscribe(consumer);
     }
 
     // EXECUTION PLAN COMPILER