You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/03 19:36:20 UTC
[tinkerpop] branch tp4 updated: got repeat() working.
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new c7ee5a0 got repeat() working.
c7ee5a0 is described below
commit c7ee5a0c93f8f54509a2c82971ae6c108f2caec8
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Wed Apr 3 13:36:10 2019 -0600
got repeat() working.
---
.../processor/rxjava/util/TopologyUtil.java | 91 +++++++++++++++-------
.../machine/processor/rxjava/RxJavaTest.java | 83 +++++++++++++++++++-
2 files changed, 143 insertions(+), 31 deletions(-)
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
index b6b0e4a..4146623 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.function.FlatMapFunction;
import org.apache.tinkerpop.machine.function.InitialFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
import org.apache.tinkerpop.machine.processor.rxjava.BranchFlow;
import org.apache.tinkerpop.machine.processor.rxjava.FilterFlow;
import org.apache.tinkerpop.machine.processor.rxjava.FlatMapFlow;
@@ -46,6 +47,8 @@ import java.util.Map;
*/
public final class TopologyUtil {
+ private static final int MAX_ITERATIONS = 20;
+
public static <C, S, E> Flowable<Traverser<C, E>> compile(final Flowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) {
final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory();
Flowable<Traverser<C, E>> sink = (Flowable) source;
@@ -55,34 +58,6 @@ public final class TopologyUtil {
return sink;
}
- /*
- private final void stageInput() {
- if (this.hasStartPredicates) {
- final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove();
- if (1 == this.untilLocation) {
- if (this.untilCompilation.filterTraverser(traverser)) {
- this.outputTraversers.add(traverser);
- } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) {
- this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
- this.repeat.addTraverser(traverser);
- } else
- this.repeat.addTraverser(traverser);
- } else if (1 == this.emitLocation) {
- if (this.emitCompilation.filterTraverser(traverser))
- this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
- if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser))
- this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
- else
- this.repeat.addTraverser(traverser);
- }
- } else {
- this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove());
- }
- }
-
-
- */
-
private static <C, S, E, B> Flowable<Traverser<C, E>> extend(Flowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) {
if (function instanceof MapFunction)
return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
@@ -114,6 +89,66 @@ public final class TopologyUtil {
sink = sink.mergeWith(branchFlow);
}
return sink;
+ } else if (function instanceof RepeatBranch) {
+ final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) function;
+ final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>();
+ for (int i = 0; i < MAX_ITERATIONS; i++) {
+ Flowable<List> selectorFlow = flow.flatMapIterable(t -> {
+ final List<List> list = new ArrayList<>();
+ if (repeatBranch.hasStartPredicates()) {
+ if (1 == repeatBranch.getUntilLocation()) {
+ if (repeatBranch.getUntil().filterTraverser(t)) {
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ } else if (2 == repeatBranch.getEmitLocation() && repeatBranch.getEmit().filterTraverser(t)) {
+ list.add(List.of(1, t));
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ } else
+ list.add(List.of(1, t));
+ } else if (1 == repeatBranch.getEmitLocation()) {
+ if (repeatBranch.getEmit().filterTraverser(t))
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ if (2 == repeatBranch.getUntilLocation() && repeatBranch.getUntil().filterTraverser(t)) {
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ } else
+ list.add(List.of(1, t));
+ }
+ } else
+ list.add(List.of(1, t));
+ return list;
+ });
+ outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+ flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)).publish(f -> compile(f, repeatBranch.getRepeat()));
+ selectorFlow = flow.flatMapIterable(t -> {
+ final List<List> list = new ArrayList<>();
+ if (repeatBranch.hasEndPredicates()) {
+ if (3 == repeatBranch.getUntilLocation()) {
+ if (repeatBranch.getUntil().filterTraverser(t)) {
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ } else if (4 == repeatBranch.getEmitLocation() && repeatBranch.getEmit().filterTraverser(t)) {
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ list.add(List.of(1, t));
+ } else
+ list.add(List.of(1, t));
+ } else if (3 == repeatBranch.getEmitLocation()) {
+ if (repeatBranch.getEmit().filterTraverser(t))
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ if (4 == repeatBranch.getUntilLocation() && repeatBranch.getUntil().filterTraverser(t))
+ list.add(List.of(0, t.repeatDone(repeatBranch)));
+ else
+ list.add(List.of(1, t));
+ }
+ } else
+ list.add(List.of(1, t.repeatLoop(repeatBranch)));
+ return list;
+ });
+ outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+ flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
+ }
+ Flowable<Traverser<C, S>> sink = flow.filter(t -> false); // branches are the only outputs
+ for (final Publisher<Traverser<C, S>> output : outputs) {
+ sink = sink.mergeWith(output);
+ }
+ return (Flowable) sink;
}
throw new RuntimeException("Need a new execution plan step: " + function);
}
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
index 8085bbc..74f5b63 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
@@ -27,9 +27,13 @@ import org.apache.tinkerpop.language.gremlin.common.__;
import org.apache.tinkerpop.machine.Machine;
import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.species.remote.MachineServer;
+import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
import org.junit.jupiter.api.Test;
+import java.util.List;
+
import static org.apache.tinkerpop.language.gremlin.common.__.incr;
/**
@@ -45,24 +49,97 @@ public class RxJavaTest {
.withProcessor(RxJavaProcessor.class)
.withStrategy(IdentityStrategy.class);
- Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(),__.<Long>incr().incr());
+ Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(), __.<Long>incr().incr());
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal.toList());
System.out.println("\n----------\n");
- traversal = g.inject(1L).choose(__.is(1L), incr(),__.<Long>incr().incr());
+ traversal = g.inject(1L).choose(__.is(1L), incr(), __.<Long>incr().incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+
+ traversal = g.inject(1L).emit().repeat(incr()).until(__.is(3L));
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal.toList());
System.out.println("\n----------\n");
+ }
+
+ @Test
+ public void shouldWork() {
+ final MachineServer server = new MachineServer(7777);
+ final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
+ final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
+ .withCoefficient(LongCoefficient.class)
+ .withProcessor(RxJavaProcessor.class)
+ .withStrategy(IdentityStrategy.class);
- /*traversal = g.inject(1L).until(__.is(P.lt(3L))).emit().repeat(incr());
+ Traversal<Long, ?, ?> traversal = g.inject(List.of(1L, 1L)).<Long>unfold().map(incr()).c(4L).repeat(incr()).until(__.is(__.constant(8L).incr().incr())).sum();
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(List.of(1L, 2L)).unfold().is(P.lt(__.constant(2L))).groupCount().by(__.incr());
System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(1L).times(10).repeat(__.incr()).emit();
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ /*traversal = g.inject(1L).repeat(incr()).emit(__.constant(true)).until(__.<Long, Long>loops().is(P.gt(5)));
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
System.out.println(traversal.toList());
System.out.println("\n----------\n");*/
+ traversal = g.inject(1L).emit(__.constant(true)).until(__.is(5L)).repeat(incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(1L).until(__.is(5L)).repeat(incr()).emit(__.constant(true));
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(7L).union(__.incr(), __.<Long>incr().incr().union(__.incr(), __.incr()));
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(7L).choose(__.is(7L), __.incr()).sum();
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(List.of(7L, 8L, 9L)).<Long>unfold().choose(__.is(7L), __.incr(), __.<Long>incr().incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ traversal = g.inject(7L).as("a").union(__.<Long>incr().as("b"), __.<Long>incr().incr().as("b"), __.<Long>incr().incr().incr().as("b")).path("a", "b").by(__.incr());
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.toList());
+ System.out.println("\n----------\n");
+ /*traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
+ System.out.println(TraversalUtil.getBytecode(traversal));
+ System.out.println(traversal);
+ System.out.println(traversal.hasNext());
+ System.out.println(traversal.nextTraverser());
+ System.out.println(traversal.hasNext());
+ System.out.println(traversal.nextTraverser());
+ System.out.println(traversal.hasNext());*/
+ ///
+ g.close();
+ machine.close();
+ server.close();
}
}