You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/11 15:44:46 UTC

[tinkerpop] branch tp4 updated: Figured out how to compile a Flow once and reuse it over and over again. This is great for nested traversals where a single traverser is inserted and result is returned and this happens over and over again for each incoming traverser. By 'caching' the Flow, we save on compilation costs.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 11ceb5b  Figured out how to compile a Flow once and reuse it over and over again. This is great for nested traversals where a single traverser is inserted and result is returned and this happens over and over again for each incoming traverser. By 'caching' the Flow, we save on compilation costs.
11ceb5b is described below

commit 11ceb5baa725384893aa05d5c71bbaadc9ee604d
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Thu Apr 11 08:09:16 2019 -0600

    Figured out how to compile a Flow once and reuse it over and over again. This is great for nested traversals where a single traverser is inserted and result is returned and this happens over and over again for each incoming traverser. By 'caching' the Flow, we save on compilation costs.
---
 .../apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java   | 3 ++-
 .../apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java   | 5 ++++-
 .../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java | 6 +++++-
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
index c9b35c3..6c478f4 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java
@@ -60,9 +60,10 @@ public abstract class AbstractRxJava<C, S, E> implements Processor<C, S, E> {
 
     @Override
     public void reset() {
+        if (null != this.disposable)
+            this.disposable.dispose();
         this.starts.clear();
         this.ends.clear();
-        this.disposable = null;
         this.executed = false;
     }
 
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index d278cbd..c1cda6f 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -50,6 +50,7 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
     private final ExecutorService threadPool;
     private final String bytecodeId;
+    private final ParallelFlowable<Traverser<C, E>> flowable;
 
     ParallelRxJava(final Compilation<C, S, E> compilation, final ExecutorService threadPool) {
         super(compilation);
@@ -57,13 +58,15 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
         this.bytecodeId = compilation.getBytecode().getParent().isEmpty() ?
                 (String) BytecodeUtil.getSourceInstructions(compilation.getBytecode(), RxJavaProcessor.RX_ROOT_BYTECODE_ID).get(0).args()[0] :
                 null;
+        // compile once and use many times
+        this.flowable = this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)), this.compilation);
     }
 
     @Override
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.disposable = this.compile(ParallelFlowable.from(Flowable.fromIterable(this.starts)).runOn(Schedulers.from(this.threadPool)), this.compilation).
+            this.disposable = this.flowable.
                     doOnNext(this.ends::add).
                     sequential().
                     doFinally(() -> {
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index e68b2ad..5da1091 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -45,15 +45,19 @@ import java.util.Map;
  */
 public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
 
+    private final Flowable<Traverser<C, E>> flowable;
+
     SerialRxJava(final Compilation<C, S, E> compilation) {
         super(compilation);
+        // compile once and reuse many times
+        this.flowable = SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation);
     }
 
     @Override
     protected void prepareFlow() {
         if (!this.executed) {
             this.executed = true;
-            this.disposable = SerialRxJava.compile(Flowable.fromIterable(this.starts), this.compilation).
+            this.disposable = this.flowable.
                     doOnNext(this.ends::add).
                     subscribeOn(Schedulers.newThread()).subscribe(); // don't block the execution so results can be streamed back in real-time
         }