You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/10 20:01:17 UTC

[tinkerpop] branch tp4 updated: consistent terminology between various packages. minor generalization in RxJava. Bout to dive into a new repeat() implementation for RxJava. Wish me luck.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 4df7443  consistent terminology between various packages. minor generalization in RxJava. Bout to dive into a new repeat() implementation for RxJava. Wish me luck.
4df7443 is described below

commit 4df7443b704d0685bc02c9b7f6afc4a63c651c49
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Wed Apr 10 08:23:09 2019 -0600

    consistent terminology between various packages. minor generalization in RxJava. Bout to dive into a new repeat() implementation for RxJava. Wish me luck.
---
 .../apache/tinkerpop/language/gremlin/AbstractTraversal.java | 12 ++++++------
 .../tinkerpop/machine/processor/rxjava/AbstractRxJava.java   |  8 ++++++--
 .../tinkerpop/machine/processor/rxjava/ParallelRxJava.java   |  8 ++------
 .../tinkerpop/machine/processor/rxjava/SerialRxJava.java     |  4 +---
 4 files changed, 15 insertions(+), 17 deletions(-)

diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
index f65279d..51f7e08 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
@@ -37,7 +37,7 @@ public abstract class AbstractTraversal<C, S, E> implements Traversal<C, S, E> {
     protected final Bytecode<C> bytecode;
     protected Coefficient<C> currentCoefficient;
     private Iterator<Traverser<C, E>> traversers = null;
-    private boolean locked = false;
+    private boolean executed = false;
 
     // iteration helpers
     private long lastCount = 0L;
@@ -52,16 +52,16 @@ public abstract class AbstractTraversal<C, S, E> implements Traversal<C, S, E> {
     ///////
 
     protected final <A, B> Traversal<C, A, B> addInstruction(final String op, final Object... args) {
-        if (this.locked)
-            throw new IllegalStateException("The traversal has already been compiled and can no longer be mutated");
+        if (this.executed)
+            throw new IllegalStateException("The traversal has already been submitted and can no longer be mutated");
         this.bytecode.addInstruction(this.currentCoefficient, op, args);
         this.currentCoefficient.unity();
         return (Traversal<C, A, B>) this;
     }
 
-    private final void prepareTraversal() {
-        if (!this.locked) {
-            this.locked = true;
+    private void prepareTraversal() {
+        if (!this.executed) {
+            this.executed = true;
             this.traversers = this.machine.submit(this.bytecode);
         }
     }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 22a6e9e..c9b35c3 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.processor.Processor;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 import org.apache.tinkerpop.machine.traverser.TraverserSet;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -69,4 +67,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
     }
 
     protected abstract void prepareFlow();
+
+    void waitForCompletionOrResult() {
+        while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
+            // wait until either the flow is complete or there is a traverser result
+        }
+    }
 }
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 8828e5e..5a6c26a 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -63,9 +63,7 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.disposable = this.compile(
-                    ParallelFlowable.from(Flowable.fromIterable(this.starts)).
-                            runOn(Schedulers.from(this.threadPool)), this.compilation).
+            this.disposable = this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)), this.compilation).
                     doOnNext(this.ends::add).
                     sequential().
                     doFinally(() -> {
@@ -77,9 +75,7 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
                     subscribe(); // don't block the execution so results can be streamed back in real-time
 
         }
-        while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
-            // only return if there is a result ready from the flow (or the flow is dead)
-        }
+        this.waitForCompletionOrResult();
     }
 
     // EXECUTION PLAN COMPILER
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 87d2c97..c6d2f41 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -56,9 +56,7 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
                     doOnNext(this.ends::add).
                     subscribe(); // don't block the execution so results can be streamed back in real-time
         }
-        while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
-            // only return if there is a result ready from the flow (or the flow is dead)
-        }
+        this.waitForCompletionOrResult();
     }
 
     // EXECUTION PLAN COMPILER