You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/08 22:27:56 UTC
[tinkerpop] branch tp4 updated: was able to remove a sequential
conversion in ParallelRxJava repeat().
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 5b64d7b was able to remove a sequential conversion in ParallelRxJava repeat().
5b64d7b is described below
commit 5b64d7b715218b422402392bff7b6249b25f5b54
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Mon Apr 8 16:27:47 2019 -0600
was able to remove a sequential conversion in ParallelRxJava repeat().
---
.../tinkerpop/machine/processor/rxjava/ParallelRxJava.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
index 75bb398..2ee8b35 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
@@ -90,7 +90,8 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
else if (function instanceof FilterFunction) {
return (ParallelFlowable) flow.filter(new FilterFlow<>((FilterFunction<C, S>) function));
} else if (function instanceof FlatMapFunction) {
- return flow.sequential().flatMapIterable(new FlatMapFlow<>((FlatMapFunction<C, S, E>) function)).parallel().runOn(Schedulers.from(this.threadPool));
+ final FlatMapFlow<C, S, E> flatMapFlow = new FlatMapFlow<>((FlatMapFunction<C, S, E>) function);
+ return flow.flatMap(t -> Flowable.fromIterable(flatMapFlow.apply(t)));
} else if (function instanceof InitialFunction) {
return Flowable.fromIterable(() -> IteratorUtils.map(((InitialFunction<C, E>) function).get(), s -> traverserFactory.create(function, s))).parallel().runOn(Schedulers.from(this.threadPool));
} else if (function instanceof ReduceFunction) {
@@ -120,13 +121,16 @@ public final class ParallelRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
ParallelFlowable<List> selectorFlow;
for (int i = 0; i < MAX_REPETITIONS; i++) {
if (repeatBranch.hasStartPredicates()) {
- selectorFlow = flow.sequential().flatMapIterable(new RepeatStart<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
+ final RepeatStart<C, S> repeatStart = new RepeatStart<>(repeatBranch);
+ selectorFlow = flow.flatMap(t -> Flowable.fromIterable(repeatStart.apply(t)));
outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
flow = this.compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), (Compilation) repeatBranch.getRepeat());
} else
flow = this.compile(flow, (Compilation) repeatBranch.getRepeat());
- selectorFlow = flow.sequential().flatMapIterable(new RepeatEnd<>(repeatBranch)).parallel().runOn(Schedulers.from(this.threadPool));
- outputs.add(selectorFlow.sequential().filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+ ///
+ final RepeatEnd<C, S> repeatEnd = new RepeatEnd<>(repeatBranch);
+ selectorFlow = flow.flatMap(t -> Flowable.fromIterable(repeatEnd.apply(t)));
+ outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)).sequential());
flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
}
return (ParallelFlowable) PublishProcessor.merge(outputs).parallel().runOn(Schedulers.from(this.threadPool));