You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/23 11:02:40 UTC

[tinkerpop] branch tp4 updated: some reorg on RxJava.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new d9a9863  some reorg on RxJava.
d9a9863 is described below

commit d9a986302a487359ebb42387bfb7a3a851ffcbca
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Tue Apr 23 05:02:31 2019 -0600

    some reorg on RxJava.
---
 .../tinkerpop/machine/processor/pipes/Pipes.java   |  1 -
 .../machine/processor/rxjava/AbstractRxJava.java   | 23 +++++++++-------------
 .../machine/processor/rxjava/ParallelRxJava.java   |  5 +++--
 .../machine/processor/rxjava/SerialRxJava.java     |  5 +++--
 4 files changed, 15 insertions(+), 19 deletions(-)

diff --git a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
index 9f3c7c8..317659f 100644
--- a/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
+++ b/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java
@@ -100,7 +100,6 @@ public final class Pipes<C, S, E> implements Processor<C, S, E> {
 
     @Override
     public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
-
         if (this.isRunning())
             throw Processor.Exceptions.processorIsCurrentlyRunning(this);
 
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index d16a4e1..9d88651 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.compiler.Compilation;
 import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
+import org.apache.tinkerpop.machine.util.IteratorUtils;
 
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,15 +64,8 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
 
     @Override
     public Iterator<Traverser<C, E>> iterator(final Iterator<Traverser<C, S>> starts) {
-        if (this.isRunning())
-            throw Processor.Exceptions.processorIsCurrentlyRunning(this);
-
-        this.running.set(Boolean.TRUE);
-        this.starts.clear();
-        this.ends.clear();
-        starts.forEachRemaining(this.starts::add);
-        this.prepareFlow(this.ends::add);
-        return new Iterator<>() {
+        this.prepareFlow(starts, this.ends::add);
+        return IteratorUtils.onLast(new Iterator<>() {
             @Override
             public boolean hasNext() {
                 waitForCompletionOrResult();
@@ -83,11 +77,15 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
                 waitForCompletionOrResult();
                 return ends.remove();
             }
-        };
+        }, this::stop);
     }
 
     @Override
     public void subscribe(final Iterator<Traverser<C, S>> starts, final Consumer<Traverser<C, E>> consumer) {
+        this.prepareFlow(starts, consumer::accept);
+    }
+
+    protected void prepareFlow(final Iterator<Traverser<C, S>> starts, final io.reactivex.functions.Consumer<? super Traverser<C, E>> consumer) {
         if (this.isRunning())
             throw Processor.Exceptions.processorIsCurrentlyRunning(this);
 
@@ -95,13 +93,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
         this.starts.clear();
         this.ends.clear();
         starts.forEachRemaining(this.starts::add);
-        this.prepareFlow(consumer::accept);
     }
 
-    protected abstract void prepareFlow(final io.reactivex.functions.Consumer<? super Traverser<C, E>> consumer);
-
     private void waitForCompletionOrResult() {
-        while (this.ends.isEmpty() && this.isRunning()) {
+        while (this.ends.isEmpty() && this.running.get()) {
             // wait until either the flow is complete or there is a traverser result
         }
     }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index b0d92b1..3aac6a2 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -34,13 +34,13 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 import org.apache.tinkerpop.machine.util.IteratorUtils;
 import org.reactivestreams.Publisher;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -65,7 +65,8 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     }
 
     @Override
-    protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+    protected void prepareFlow(final Iterator<Traverser<C, S>> starts, final Consumer<? super Traverser<C, E>> consumer) {
+        super.prepareFlow(starts, consumer);
         this.disposable = this.flowable
                 .doOnNext(consumer)
                 .sequential()
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 0d4cbce..b8d8bc4 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -32,13 +32,13 @@ import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
 import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
-import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 import org.apache.tinkerpop.machine.util.IteratorUtils;
 import org.reactivestreams.Publisher;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -56,7 +56,8 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     }
 
     @Override
-    protected void prepareFlow(final Consumer<? super Traverser<C, E>> consumer) {
+    protected void prepareFlow(final Iterator<Traverser<C, S>> starts, final Consumer<? super Traverser<C, E>> consumer) {
+        super.prepareFlow(starts, consumer);
         this.disposable = this.flowable
                 .subscribeOn(Schedulers.newThread())  // don't block the execution so results can be streamed back in real-time
                 .doFinally(() -> this.running.set(Boolean.FALSE))