You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/23 11:02:40 UTC
[tinkerpop] branch tp4 updated: some reorg on RxJava.
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new d9a9863 some reorg on RxJava.
d9a9863 is described below
commit d9a986302a487359ebb42387bfb7a3a851ffcbca
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 23 05:02:31 2019 -0600
some reorg on RxJava.
---
.../tinkerpop/machine/processor/pipes/Pipes.java | 1 -
.../machine/processor/rxjava/AbstractRxJava.java | 23 +++++++++-------------
.../machine/processor/rxjava/ParallelRxJava.java | 5 +++--
.../machine/processor/rxjava/SerialRxJava.java | 5 +++--
4 files changed, 15 insertions(+), 19 deletions(-)
diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index 9f3c7c8..317659f 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -100,7 +100,6 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> {
@Override
public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
-
if (this.isRunning())
throw Processor.Exceptions.processorIsCurrentlyRunning(this);
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index d16a4e1..9d88651 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,15 +64,8 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
@Override
public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
- if (this.isRunning())
- throw Processor.Exceptions.processorIsCurrentlyRunning(this);
-
- this.running.set(Boolean.TRUE);
- this.starts.clear();
- this.ends.clear();
- starts.forEachRemaining(this.starts::add);
- this.prepareFlow(this.ends::add);
- return new Iterator<>() {
+ this.prepareFlow(starts, this.ends::add);
+ return IteratorUtils.onLast(new Iterator<>() {
@Override
public boolean hasNext() {
waitForCompletionOrResult();
@@ -83,11 +77,15 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
waitForCompletionOrResult();
return ends.remove();
}
- };
+ }, this::stop);
}
@Override
public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+ this.prepareFlow(starts, consumer::accept);
+ }
+
+ protected void prepareFlow(final Iterator<Traverser<C, S>> starts, final io.reactivex.functions.Consumer<? super Traverser<C, E>> consumer) {
if (this.isRunning())
throw Processor.Exceptions.processorIsCurrentlyRunning(this);
@@ -95,13 +93,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
this.starts.clear();
this.ends.clear();
starts.forEachRemaining(this.starts::add);
- this.prepareFlow(consumer::accept);
}
- protected abstract void prepareFlow(final io.reactivex.functions.Consumer<? super Traverser<C, E>> consumer);
-
private void waitForCompletionOrResult() {
- while (this.ends.isEmpty() && this.isRunning()) {
+ while (this.ends.isEmpty() && this.running.get()) {
// wait until either the flow is complete or there is a traverser result
}
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index b0d92b1..3aac6a2 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -34,13 +34,13 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.function.ReduceFunction;
import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserFactory;
import org.apache.tinkerpop.machine.util.IteratorUtils;
import org.reactivestreams.Publisher;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -65,7 +65,8 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
}
@Override
- protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+ protected void prepareFlow(final Iterator<Traverser<C, S>> starts, final Consumer<? super Traverser<C, E>> consumer) {
+ super.prepareFlow(starts, consumer);
this.disposable = this.flowable
.doOnNext(consumer)
.sequential()
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 0d4cbce..b8d8bc4 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -32,13 +32,13 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.function.ReduceFunction;
import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserFactory;
import org.apache.tinkerpop.machine.util.IteratorUtils;
import org.reactivestreams.Publisher;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,7 +56,8 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
}
@Override
- protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+ protected void prepareFlow(final Iterator<Traverser<C, S>> starts, final Consumer<? super Traverser<C, E>> consumer) {
+ super.prepareFlow(starts, consumer);
this.disposable = this.flowable
.subscribeOn(Schedulers.newThread()) // don't block the execution so results can be streamed back in real-time
.doFinally(() -> this.running.set(Boolean.FALSE))