You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/08 22:27:56 UTC

[tinkerpop] branch tp4 updated: was able to remove a sequential conversion in ParallelRxJava repeat().

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 5b64d7b  was able to remove a sequential conversion in ParallelRxJava repeat().
5b64d7b is described below

commit 5b64d7b715218b422402392bff7b6249b25f5b54
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Mon Apr 8 16:27:47 2019 -0600

    was able to remove a sequential conversion in ParallelRxJava repeat().
---
 .../tinkerpop/machine/processor/rxjava/ParallelRxJava.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 75bb398..2ee8b35 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -90,7 +90,8 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
         else if (function instanceof FilterFunction) {
             return (ParallelFlowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function));
         } else if (function instanceof FlatMapFunction) {
-            return flow.sequential().flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel().runOn(Schedulers.from(this.threadPool));
+            final FlatMapFlow<C, S, E> flatMapFlow = new FlatMapFlow<>((FlatMapFunction<C, S, E>) function);
+            return flow.flatMap(t -> Flowable.fromIterable(flatMapFlow.apply(t)));
         } else if (function instanceof InitialFunction) {
             return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))).parallel().runOn(Schedulers.from(this.threadPool));
         } else if (function instanceof ReduceFunction) {
@@ -120,13 +121,16 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
             ParallelFlowable<List> selectorFlow;
             for (int i = 0; i < MAX_REPETITIONS; i++) {
                 if (repeatBranch.hasStartPredicates()) {
-                    selectorFlow = flow.sequential().flatMapIterable(new RepeatStart<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
+                    final RepeatStart<C, S> repeatStart = new RepeatStart<>(repeatBranch);
+                    selectorFlow = flow.flatMap(t -> Flowable.fromIterable(repeatStart.apply(t)));
                     outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
                     flow = this.compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), (Compilation) repeatBranch.getRepeat());
                 } else
                     flow = this.compile(flow, (Compilation) repeatBranch.getRepeat());
-                selectorFlow = flow.sequential().flatMapIterable(new RepeatEnd<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
-                outputs.add(selectorFlow.sequential().filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+                ///
+                final RepeatEnd<C, S> repeatEnd = new RepeatEnd<>(repeatBranch);
+                selectorFlow = flow.flatMap(t -> Flowable.fromIterable(repeatEnd.apply(t)));
+                outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
                 flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
             }
             return (ParallelFlowable) PublishProcessor.merge(outputs).parallel().runOn(Schedulers.from(this.threadPool));