You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/23 10:38:29 UTC
[tinkerpop] branch tp4 updated: Processor no longer implement
Iterator. It now supports both a push-based and a pull-based execution via
subscribe(Consumer) and iterator(),
respectively. I haven't done this properly for Beam as I need to rework
TraverserServer. Also,
need to think about how to use Compilation within a push-based system so
that local/nested traversals don't block on iterator.next().
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 2cf7cdf Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next().
2cf7cdf is described below
commit 2cf7cdfed05234a22b29a124c6617fb45bf5e7de
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 23 04:38:15 2019 -0600
Processor no longer implement Iterator. It now supports both a push-based and a pull-based execution via subscribe(Consumer) and iterator(), respectively. I haven't done this properly for Beam as I need to rework TraverserServer. Also, need to think about how to use Compilation within a push-based system so that local/nested traversals don't block on iterator.next().
---
.../machine/bytecode/compiler/Compilation.java | 39 +++++++------
.../machine/processor/EmptyProcessor.java | 16 ++++--
.../machine/processor/FilterProcessor.java | 7 ++-
.../machine/processor/LoopsProcessor.java | 5 +-
.../tinkerpop/machine/processor/Processor.java | 45 +++++++++++++--
.../machine/processor/SimpleProcessor.java | 44 +++++++++++----
.../tinkerpop/machine/species/BasicMachine.java | 3 +-
.../tinkerpop/machine/species/LocalMachine.java | 5 +-
.../tinkerpop/machine/util/IteratorUtils.java | 35 +++++++++++-
.../tinkerpop/machine/processor/beam/Beam.java | 43 ++++++++++----
.../machine/processor/pipes/BranchStep.java | 4 +-
.../tinkerpop/machine/processor/pipes/Pipes.java | 45 +++++++++++----
.../machine/processor/pipes/RepeatStep.java | 22 +++++---
.../machine/processor/rxjava/AbstractRxJava.java | 65 ++++++++++++++++------
.../machine/processor/rxjava/ParallelRxJava.java | 31 +++++------
.../machine/processor/rxjava/SerialRxJava.java | 15 +++--
16 files changed, 305 insertions(+), 119 deletions(-)
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
index 32a04e4..f7105d9 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/bytecode/compiler/Compilation.java
@@ -28,6 +28,7 @@ import org.apache.tinkerpop.machine.structure.EmptyStructure;
import org.apache.tinkerpop.machine.structure.StructureFactory;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserFactory;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
import java.io.Serializable;
import java.util.ArrayList;
@@ -75,53 +76,51 @@ public final class Compilation<C, S, E> implements Serializable, Cloneable {
}
public Processor<C, S, E> getProcessor() {
- if (null == this.processor)
- this.processor = this.processorFactory.mint(this);
+ this.prepareProcessor();
return this.processor;
}
private void prepareProcessor() {
if (null == this.processor)
this.processor = this.processorFactory.mint(this);
- else
- this.processor.reset();
}
- private Traverser<C, S> prepareTraverser(final Traverser<C, S> traverser) {
+ private Iterator<Traverser<C, S>> prepareTraverser(final Traverser<C, S> traverser) {
final Traverser<C, S> clone = traverser.clone();
clone.coefficient().unity();
- return clone;
+ return IteratorUtils.of(clone);
}
public Traverser<C, E> mapTraverser(final Traverser<C, S> traverser) {
this.prepareProcessor();
- this.processor.addStart(this.prepareTraverser(traverser));
- if (!this.processor.hasNext())
+ final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(traverser));
+ if (!iterator.hasNext())
throw new RuntimeException("The nested traversal is not a map function: " + this);
- return this.processor.next();
+ final Traverser<C, E> result = iterator.next();
+ this.processor.stop();
+ return result;
}
public Traverser<C, E> mapObject(final S object) {
this.prepareProcessor();
- this.processor.addStart(this.traverserFactory.create(this.functions.get(0), object));
- return this.processor.next();
+ final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(this.traverserFactory.create(this.functions.get(0), object)));
+ final Traverser<C, E> result = iterator.next();
+ this.processor.stop();
+ return result;
}
public Iterator<Traverser<C, E>> flatMapTraverser(final Traverser<C, S> traverser) {
this.prepareProcessor();
- this.processor.addStart(this.prepareTraverser(traverser));
- return this.processor;
+ final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(traverser));
+ return IteratorUtils.onLast(iterator, () -> processor.stop());
}
public boolean filterTraverser(final Traverser<C, S> traverser) {
this.prepareProcessor();
- this.processor.addStart(this.prepareTraverser(traverser));
- return this.processor.hasNext();
- }
-
- public Processor<C, S, E> addTraverser(final Traverser<C, S> traverser) {
- this.getProcessor().addStart(traverser);
- return this.processor;
+ final Iterator<Traverser<C, E>> iterator = this.processor.iterator(this.prepareTraverser(traverser));
+ final boolean hasNext = iterator.hasNext();
+ this.processor.stop();
+ return hasNext;
}
public List<CFunction<C>> getFunctions() {
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java
index 127f028..d3da9cb 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/EmptyProcessor.java
@@ -22,6 +22,10 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.util.FastNoSuchElementException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Consumer;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -34,22 +38,22 @@ public final class EmptyProcessor<C, S, E> implements Processor<C, S, E>, Proces
}
@Override
- public void addStart(final Traverser<C, S> traverser) {
+ public void stop() {
}
@Override
- public Traverser<C, E> next() {
- throw FastNoSuchElementException.instance();
+ public boolean isRunning() {
+ return false;
}
@Override
- public boolean hasNext() {
- return false;
+ public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C,S>> starts) {
+ return Collections.emptyIterator();
}
@Override
- public void reset() {
+ public void subscribe(final Iterator<Traverser<C,S>> starts, final Consumer<Traverser<C, E>> consumer) {
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java
index 1a23279..a83e0d9 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/FilterProcessor.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.machine.processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.util.Iterator;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -32,9 +34,8 @@ public final class FilterProcessor<C, S> extends SimpleProcessor<C, S, S> {
}
@Override
- public void addStart(final Traverser<C, S> traverser) {
+ protected void processTraverser(Iterator<Traverser<C, S>> starts) {
if (this.allow)
- this.traverser = traverser;
+ this.traverser = starts.next();
}
-
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java
index d367356..25a44e1 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/LoopsProcessor.java
@@ -20,6 +20,8 @@ package org.apache.tinkerpop.machine.processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.util.Iterator;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -32,7 +34,8 @@ public final class LoopsProcessor<C, S> extends SimpleProcessor<C, S, S> {
}
@Override
- public void addStart(final Traverser<C, S> traverser) {
+ protected void processTraverser(final Iterator<Traverser<C, S>> starts) {
+ final Traverser<C, S> traverser = starts.next();
this.traverser = traverser.loops() == this.loops ? traverser : null;
}
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
index 282d1a5..6196de4 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
@@ -19,19 +19,54 @@
package org.apache.tinkerpop.machine.processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
import java.util.Iterator;
+import java.util.function.Consumer;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public interface Processor<C, S, E> extends Iterator<Traverser<C, E>> {
+public interface Processor<C, S, E> {
- public void addStart(final Traverser<C, S> traverser);
+ /**
+ * A processor is started using {@link Processor#iterator(Iterator)} or {@link Processor#subscribe(Iterator, Consumer)}.
+ * When the iterator is empty or the subscriber has no more traversers to consume, then the process is no longer running.
+ *
+ * @return whether the processor is running
+ */
+ public boolean isRunning();
- public Traverser<C, E> next();
+ /**
+ * When a processor is stopped, subscriptions and iteration are halted.
+ */
+ public void stop();
- public boolean hasNext();
+ /**
+ * Start the processor and return a pull-based iterator.
+ * If pull-based iteration is used, then push-based subscription can not be used while the processor is running.
+ *
+ * @return an iterator of traverser results
+ */
+ public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts);
+
+ public default Iterator<Traverser<C, E>> iterator(final Traverser<C, S> start) {
+ return this.iterator(IteratorUtils.of(start));
+ }
+
+ /**
+ * Start the processor and process the resultant traversers using the push-based consumer.
+ * If push-based subscription is used, then pull-based iteration can not be used while the processor is running.
+ *
+ * @param consumer a consumer of traversers results
+ */
+ public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer);
+
+ public class Exceptions {
+
+ public static IllegalStateException processorIsCurrentlyRunning(final Processor processor) {
+ return new IllegalStateException("The processor can not be started because it is currently running: " + processor);
+ }
+ }
- public void reset();
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java
index 1052891..e2a0955 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/SimpleProcessor.java
@@ -22,6 +22,9 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.util.FastNoSuchElementException;
+import java.util.Iterator;
+import java.util.function.Consumer;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -30,23 +33,42 @@ public abstract class SimpleProcessor<C, S, E> implements Processor<C, S, E>, Pr
protected Traverser<C, E> traverser = null;
@Override
- public Traverser<C, E> next() {
- if (null == this.traverser)
- throw FastNoSuchElementException.instance();
- else {
- final Traverser<C, E> temp = this.traverser;
- this.traverser = null;
- return temp;
- }
+ public void stop() {
+ this.traverser = null;
}
@Override
- public boolean hasNext() {
+ public boolean isRunning() {
return null != this.traverser;
}
@Override
- public void reset() {
+ public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
+ this.processTraverser(starts);
+ return new Iterator<>() {
+ @Override
+ public boolean hasNext() {
+ return null != traverser;
+ }
+
+ @Override
+ public Traverser<C, E> next() {
+ if (null == traverser)
+ throw FastNoSuchElementException.instance();
+ else {
+ final Traverser<C, E> temp = traverser;
+ traverser = null;
+ return temp;
+ }
+ }
+ };
+ }
+
+ @Override
+ public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+ this.processTraverser(starts);
+ if (null != this.traverser)
+ consumer.accept(this.traverser);
this.traverser = null;
}
@@ -54,4 +76,6 @@ public abstract class SimpleProcessor<C, S, E> implements Processor<C, S, E>, Pr
public <D, T, F> Processor<D, T, F> mint(final Compilation<D, T, F> compilation) {
return (Processor<D, T, F>) this;
}
+
+ protected abstract void processTraverser(final Iterator<Traverser<C, S>> starts);
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
index 5dbe4aa..386353a 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/BasicMachine.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.Bytecode;
import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.util.Collections;
import java.util.Iterator;
/**
@@ -46,7 +47,7 @@ public final class BasicMachine implements Machine {
@Override
public <C, E> Iterator<Traverser<C, E>> submit(final Bytecode<C> bytecode) {
- return Compilation.<C, Object, E>compile(bytecode).getProcessor();
+ return Compilation.<C, Object, E>compile(bytecode).getProcessor().iterator(Collections.emptyIterator());
}
public static Machine open() {
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
index cf3e221..e1b9126 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/LocalMachine.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.bytecode.compiler.SourceCompilation;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
@@ -76,8 +77,8 @@ public final class LocalMachine implements Machine {
final UUID sourceId = LocalMachine.getSourceId(bytecode).orElse(null);
final SourceCompilation<C> source = (SourceCompilation<C>) this.sources.get(sourceId);
return null == source ?
- Compilation.<C, Object, E>compile(bytecode).getProcessor() :
- Compilation.<C, Object, E>compile(source, bytecode).getProcessor();
+ Compilation.<C, Object, E>compile(bytecode).getProcessor().iterator(Collections.emptyIterator()) :
+ Compilation.<C, Object, E>compile(source, bytecode).getProcessor().iterator(Collections.emptyIterator());
}
@Override
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java
index 439f635..d1bf2df 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/util/IteratorUtils.java
@@ -75,7 +75,40 @@ public final class IteratorUtils {
return ix;
}
- public static final long count(final Iterable iterable) {
+ public static <S> Iterator<S> onLast(final Iterator<S> iterator, final Runnable onLast) {
+ return new Iterator<>() {
+ boolean lastExecuted = false;
+
+ @Override
+ public boolean hasNext() {
+ final boolean hasNext = iterator.hasNext();
+ if (!hasNext && !this.lastExecuted) {
+ this.lastExecuted = true;
+ onLast.run();
+ }
+ return hasNext;
+ }
+
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+
+ @Override
+ public S next() {
+ try {
+ return iterator.next();
+ } finally {
+ if (!iterator.hasNext() && !this.lastExecuted) {
+ this.lastExecuted = true;
+ onLast.run();
+ }
+ }
+ }
+ };
+ }
+
+ public static long count(final Iterable iterable) {
return IteratorUtils.count(iterable.iterator());
}
diff --git a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
index ee235a7..20007c4 100644
--- a/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
+++ b/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java
@@ -50,12 +50,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserFactory;
import org.apache.tinkerpop.machine.traverser.species.EmptyTraverser;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@@ -85,25 +87,46 @@ public class Beam<C, S, E> implements Processor<C, S, E> {
}
@Override
- public void addStart(final Traverser<C, S> traverser) {
- // TODO: use side-inputs
+ public void stop() {
+ try {
+ if (null != this.pipelineResult)
+ this.pipelineResult.cancel();
+ } catch (final IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ this.pipelineResult = null;
+ this.iterator = null;
}
@Override
- public Traverser<C, E> next() {
- this.setupPipeline();
- return this.iterator.next();
+ public boolean isRunning() {
+ return null != this.pipelineResult && !this.pipelineResult.getState().isTerminal();
}
@Override
- public boolean hasNext() {
- this.setupPipeline();
- return this.iterator.hasNext();
+ public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) { // TODO: use side-inputs for starts
+ if (this.isRunning())
+ throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+ return new Iterator<>() {
+ @Override
+ public boolean hasNext() {
+ setupPipeline();
+ return iterator.hasNext();
+ }
+
+ @Override
+ public Traverser<C, E> next() {
+ setupPipeline();
+ return iterator.next();
+ }
+ };
}
@Override
- public void reset() {
- this.iterator = null;
+ public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) { // TODO: use side-inputs for starts
+ if (this.isRunning())
+ throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+ // TODO: create an IteratorOutputFn and a ConsumerOutputFn and connect in setupPipeline()
}
@Override
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
index 667af99..75d66a6 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/BranchStep.java
@@ -66,13 +66,13 @@ final class BranchStep<C, S, E, M> extends AbstractStep<C, S, E> {
if (entry.getKey().filterTraverser(traverser)) {
found = true;
for (final Compilation<C, S, E> branch : entry.getValue()) {
- ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(branch.addTraverser(traverser.clone()));
+ ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(branch.getProcessor().iterator(traverser.clone()));
}
}
}
if (!found) {
for (final Compilation<C, S, E> defaultBranch : this.defaultBranches) {
- ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(defaultBranch.addTraverser(traverser.clone()));
+ ((MultiIterator<Traverser<C, E>>) this.nextTraversers).addIterator(defaultBranch.getProcessor().iterator(traverser.clone()));
}
}
}
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index e370c58..9f3c7c8 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -31,9 +31,13 @@ import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.processor.pipes.util.InMemoryReducer;
import org.apache.tinkerpop.machine.traverser.Traverser;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -43,6 +47,7 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> {
private final List<Step<?, ?, ?>> steps = new ArrayList<>();
private Step<C, ?, E> endStep;
private SourceStep<C, S> startStep;
+ private AtomicBoolean alive = new AtomicBoolean(Boolean.FALSE);
public Pipes(final Compilation<C, S, E> compilation) {
Step<C, ?, S> previousStep = EmptyStep.instance();
@@ -81,25 +86,45 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> {
}
@Override
- public void addStart(final Traverser<C, S> traverser) {
- this.startStep.addStart(traverser);
+ public void stop() {
+ this.alive.set(Boolean.FALSE);
+ for (final Step<?, ?, ?> step : this.steps) {
+ step.reset();
+ }
}
@Override
- public Traverser<C, E> next() {
- return this.endStep.next();
+ public boolean isRunning() {
+ return this.alive.get();
}
@Override
- public boolean hasNext() {
- return this.endStep.hasNext();
+ public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
+
+ if (this.isRunning())
+ throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+ this.alive.set(Boolean.TRUE);
+ if (null != this.startStep)
+ starts.forEachRemaining(this.startStep::addStart);
+ return IteratorUtils.onLast(this.endStep, () -> this.alive.set(Boolean.FALSE));
}
+
@Override
- public void reset() {
- for (final Step<?, ?, ?> step : this.steps) {
- step.reset();
- }
+ public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+ if (this.isRunning())
+ throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+ this.alive.set(Boolean.TRUE);
+ new Thread(() -> {
+ final Iterator<Traverser<C, E>> iterator = this.iterator(starts);
+ while (iterator.hasNext()) {
+ if (!this.alive.get())
+ break;
+ consumer.accept(iterator.next());
+ }
+ }).start();
}
@Override
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java
index 8dc25f3..4c5fa5d 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/RepeatStep.java
@@ -22,6 +22,10 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
+
+import java.util.Collections;
+import java.util.Iterator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -34,6 +38,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
private final Compilation<C, S, ?> untilCompilation;
private final Compilation<C, S, ?> emitCompilation;
private final Compilation<C, S, S> repeat;
+ private Iterator<Traverser<C, S>> repeatIterator = Collections.emptyIterator();
+ private TraverserSet<C, S> repeatStarts = new TraverserSet<>();
private TraverserSet<C, S> outputTraversers = new TraverserSet<>();
private TraverserSet<C, S> inputTraversers = new TraverserSet<>();
private final boolean hasStartPredicates;
@@ -72,19 +78,19 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
this.outputTraversers.add(traverser);
} else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) {
this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
- this.repeat.addTraverser(traverser);
+ this.repeatStarts.add(traverser);
} else
- this.repeat.addTraverser(traverser);
+ this.repeatStarts.add(traverser);
} else if (1 == this.emitLocation) {
if (this.emitCompilation.filterTraverser(traverser))
this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser))
this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
else
- this.repeat.addTraverser(traverser);
+ this.repeatStarts.add(traverser);
}
} else {
- this.repeat.addTraverser(traverser);
+ this.repeatStarts.add(traverser);
}
return true;
}
@@ -92,9 +98,9 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
}
private void stageOutput() {
- while (this.outputTraversers.isEmpty() && (this.repeat.getProcessor().hasNext() || this.stageInput())) {
- if (this.repeat.getProcessor().hasNext()) {
- final Traverser<C, S> traverser = this.repeat.getProcessor().next().repeatLoop(this.repeatBranch);
+ while (this.outputTraversers.isEmpty() && (this.repeatIterator.hasNext() || this.stageInput())) {
+ if (this.repeatIterator.hasNext()) {
+ final Traverser<C, S> traverser = this.repeatIterator.next().repeatLoop(this.repeatBranch);
if (this.hasEndPredicates) {
if (3 == this.untilLocation) {
if (this.untilCompilation.filterTraverser(traverser)) {
@@ -115,6 +121,8 @@ final class RepeatStep<C, S> extends AbstractStep<C, S, S> {
} else {
this.inputTraversers.add(traverser);
}
+ } else {
+ this.repeatIterator = this.repeat.getProcessor().iterator(IteratorUtils.removeOnNext(this.repeatStarts.iterator()));
}
}
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 6c478f4..d16a4e1 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -24,6 +24,10 @@ import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -31,10 +35,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic configuration
- boolean executed = false;
Disposable disposable;
+ final AtomicBoolean running = new AtomicBoolean(Boolean.FALSE);
final TraverserSet<C, S> starts = new TraverserSet<>();
- final TraverserSet<C, E> ends = new TraverserSet<>();
+ private final TraverserSet<C, E> ends = new TraverserSet<>();
final Compilation<C, S, E> compilation;
AbstractRxJava(final Compilation<C, S, E> compilation) {
@@ -42,35 +46,62 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
}
@Override
- public void addStart(final Traverser<C, S> traverser) {
- this.starts.add(traverser);
+ public void stop() {
+ if (null != this.disposable) {
+ this.disposable.dispose();
+ this.disposable = null;
+ }
+ this.starts.clear();
+ this.ends.clear();
+ this.running.set(Boolean.FALSE);
}
@Override
- public Traverser<C, E> next() {
- this.prepareFlow();
- return this.ends.remove();
+ public boolean isRunning() {
+ return this.running.get();
}
@Override
- public boolean hasNext() {
- this.prepareFlow();
- return !this.ends.isEmpty();
+ public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
+ if (this.isRunning())
+ throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+ this.running.set(Boolean.TRUE);
+ this.starts.clear();
+ this.ends.clear();
+ starts.forEachRemaining(this.starts::add);
+ this.prepareFlow(this.ends::add);
+ return new Iterator<>() {
+ @Override
+ public boolean hasNext() {
+ waitForCompletionOrResult();
+ return !ends.isEmpty();
+ }
+
+ @Override
+ public Traverser<C, E> next() {
+ waitForCompletionOrResult();
+ return ends.remove();
+ }
+ };
}
@Override
- public void reset() {
- if (null != this.disposable)
- this.disposable.dispose();
+ public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+ if (this.isRunning())
+ throw Processor.Exceptions.processorIsCurrentlyRunning(this);
+
+ this.running.set(Boolean.TRUE);
this.starts.clear();
this.ends.clear();
- this.executed = false;
+ starts.forEachRemaining(this.starts::add);
+ this.prepareFlow(consumer::accept);
}
- protected abstract void prepareFlow();
+ protected abstract void prepareFlow(final io.reactivex.functions.Consumer<? super Traverser<C, E>> consumer);
- void waitForCompletionOrResult() {
- while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
+ private void waitForCompletionOrResult() {
+ while (this.ends.isEmpty() && this.isRunning()) {
// wait until either the flow is complete or there is a traverser result
}
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 085168a..b0d92b1 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.machine.processor.rxjava;
import io.reactivex.Flowable;
+import io.reactivex.functions.Consumer;
import io.reactivex.parallel.ParallelFlowable;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
@@ -33,6 +34,7 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.function.ReduceFunction;
import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserFactory;
import org.apache.tinkerpop.machine.util.IteratorUtils;
@@ -63,22 +65,19 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
}
@Override
- protected void prepareFlow() {
- if (!this.executed) {
- this.executed = true;
- this.disposable = this.flowable.
- doOnNext(this.ends::add).
- sequential().
- doFinally(() -> {
- if (null != this.bytecodeId) { // only the parent compilation should close the thread pool
- this.threadPool.shutdown();
- RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
- }
- }).
- subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time
-
- }
- this.waitForCompletionOrResult();
+ protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+ this.disposable = this.flowable
+ .doOnNext(consumer)
+ .sequential()
+ .subscribeOn(Schedulers.newThread()) // don't block the execution so results can be streamed back in real-time
+ .doFinally(() -> {
+ if (null != this.bytecodeId) { // only the parent compilation should close the thread pool
+ RxJavaProcessor.THREAD_POOLS.remove(this.bytecodeId);
+ this.threadPool.shutdown();
+ }
+ this.running.set(Boolean.FALSE);
+ })
+ .subscribe();
}
// EXECUTION PLAN COMPILER
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index ee04fb3..0d4cbce 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.machine.processor.rxjava;
import io.reactivex.Flowable;
+import io.reactivex.functions.Consumer;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
@@ -31,6 +32,7 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.function.ReduceFunction;
import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
+import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserFactory;
import org.apache.tinkerpop.machine.util.IteratorUtils;
@@ -54,14 +56,11 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
}
@Override
- protected void prepareFlow() {
- if (!this.executed) {
- this.executed = true;
- this.disposable = this.flowable.
- doOnNext(this.ends::add).
- subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time
- }
- this.waitForCompletionOrResult();
+ protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+ this.disposable = this.flowable
+ .subscribeOn(Schedulers.newThread()) // don't block the execution so results can be streamed back in real-time
+ .doFinally(() -> this.running.set(Boolean.FALSE))
+ .subscribe(consumer);
}
// EXECUTION PLAN COMPILER