You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/10 20:01:17 UTC
[tinkerpop] branch tp4 updated: consistent terminology between
various packages. minor generalization in RxJava. Bout to dive into a new
repeat() implementation for RxJava. Wish me luck.
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 4df7443 consistent terminology between various packages. minor generalization in RxJava. Bout to dive into a new repeat() implementation for RxJava. Wish me luck.
4df7443 is described below
commit 4df7443b704d0685bc02c9b7f6afc4a63c651c49
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Wed Apr 10 08:23:09 2019 -0600
consistent terminology between various packages. minor generalization in RxJava. Bout to dive into a new repeat() implementation for RxJava. Wish me luck.
---
.../apache/tinkerpop/language/gremlin/AbstractTraversal.java | 12 ++++++------
.../tinkerpop/machine/processor/rxjava/AbstractRxJava.java | 8 ++++++--
.../tinkerpop/machine/processor/rxjava/ParallelRxJava.java | 8 ++------
.../tinkerpop/machine/processor/rxjava/SerialRxJava.java | 4 +---
4 files changed, 15 insertions(+), 17 deletions(-)
diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
index f65279d..51f7e08 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
@@ -37,7 +37,7 @@ public abstract class AbstractTraversal<C, S, E> implements Traversal<C, S, E> {
protected final Bytecode<C> bytecode;
protected Coefficient<C> currentCoefficient;
private Iterator<Traverser<C, E>> traversers = null;
- private boolean locked = false;
+ private boolean executed = false;
// iteration helpers
private long lastCount = 0L;
@@ -52,16 +52,16 @@ public abstract class AbstractTraversal<C, S, E> implements Traversal<C, S, E> {
///////
protected final <A, B> Traversal<C, A, B> addInstruction(final String op, final Object... args) {
- if (this.locked)
- throw new IllegalStateException("The traversal has already been compiled and can no longer be mutated");
+ if (this.executed)
+ throw new IllegalStateException("The traversal has already been submitted and can no longer be mutated");
this.bytecode.addInstruction(this.currentCoefficient, op, args);
this.currentCoefficient.unity();
return (Traversal<C, A, B>) this;
}
- private final void prepareTraversal() {
- if (!this.locked) {
- this.locked = true;
+ private void prepareTraversal() {
+ if (!this.executed) {
+ this.executed = true;
this.traversers = this.machine.submit(this.bytecode);
}
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index 22a6e9e..c9b35c3 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.processor.Processor;
import org.apache.tinkerpop.machine.traverser.Traverser;
import org.apache.tinkerpop.machine.traverser.TraverserSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -69,4 +67,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
}
protected abstract void prepareFlow();
+
+ void waitForCompletionOrResult() {
+ while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
+ // wait until either the flow is complete or there is a traverser result
+ }
+ }
}
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 8828e5e..5a6c26a 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -63,9 +63,7 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
protected void prepareFlow() {
if (!this.executed) {
this.executed = true;
- this.disposable = this.compile(
- ParallelFlowable.from(Flowable.fromIterable(this.starts)).
- runOn(Schedulers.from(this.threadPool)), this.compilation).
+ this.disposable = this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)), this.compilation).
doOnNext(this.ends::add).
sequential().
doFinally(() -> {
@@ -77,9 +75,7 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
subscribe(); // don't block the execution so results can be streamed back in real-time
}
- while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
- // only return if there is a result ready from the flow (or the flow is dead)
- }
+ this.waitForCompletionOrResult();
}
// EXECUTION PLAN COMPILER
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 87d2c97..c6d2f41 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -56,9 +56,7 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
doOnNext(this.ends::add).
subscribe(); // don't block the execution so results can be streamed back in real-time
}
- while (!this.disposable.isDisposed() && this.ends.isEmpty()) {
- // only return if there is a result ready from the flow (or the flow is dead)
- }
+ this.waitForCompletionOrResult();
}
// EXECUTION PLAN COMPILER