You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/03 19:36:20 UTC

[tinkerpop] branch tp4 updated: got repeat() working.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new c7ee5a0  got repeat() working.
c7ee5a0 is described below

commit c7ee5a0c93f8f54509a2c82971ae6c108f2caec8
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Wed Apr 3 13:36:10 2019 -0600

    got repeat() working.
---
 .../processor/rxjava/util/TopologyUtil.java        | 91 +++++++++++++++-------
 .../machine/processor/rxjava/RxJavaTest.java       | 83 +++++++++++++++++++-
 2 files changed, 143 insertions(+), 31 deletions(-)

diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
index b6b0e4a..4146623 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/util/TopologyUtil.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.machine.function.FlatMapFunction;
 import org.apache.tinkerpop.machine.function.InitialFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.function.ReduceFunction;
+import org.apache.tinkerpop.machine.function.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.processor.rxjava.BranchFlow;
 import org.apache.tinkerpop.machine.processor.rxjava.FilterFlow;
 import org.apache.tinkerpop.machine.processor.rxjava.FlatMapFlow;
@@ -46,6 +47,8 @@ import java.util.Map;
  */
 public final class TopologyUtil {
 
+    private static final int MAX_ITERATIONS = 20;
+
     public static <C, S, E> Flowable<Traverser<C, E>> compile(final Flowable<Traverser<C, S>> source, final Compilation<C, S, E> compilation) {
         final TraverserFactory<C> traverserFactory = compilation.getTraverserFactory();
         Flowable<Traverser<C, E>> sink = (Flowable) source;
@@ -55,34 +58,6 @@ public final class TopologyUtil {
         return sink;
     }
 
-    /*
-     private final void stageInput() {
-        if (this.hasStartPredicates) {
-            final Traverser<C, S> traverser = this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove();
-            if (1 == this.untilLocation) {
-                if (this.untilCompilation.filterTraverser(traverser)) {
-                    this.outputTraversers.add(traverser);
-                } else if (2 == this.emitLocation && this.emitCompilation.filterTraverser(traverser)) {
-                    this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
-                    this.repeat.addTraverser(traverser);
-                } else
-                    this.repeat.addTraverser(traverser);
-            } else if (1 == this.emitLocation) {
-                if (this.emitCompilation.filterTraverser(traverser))
-                    this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
-                if (2 == this.untilLocation && this.untilCompilation.filterTraverser(traverser))
-                    this.outputTraversers.add(traverser.repeatDone(this.repeatBranch));
-                else
-                    this.repeat.addTraverser(traverser);
-            }
-        } else {
-            this.repeat.addTraverser(this.inputTraversers.isEmpty() ? this.previousStep.next() : this.inputTraversers.remove());
-        }
-    }
-
-
-     */
-
     private static <C, S, E, B> Flowable<Traverser<C, E>> extend(Flowable<Traverser<C, S>> flow, final CFunction<C> function, final TraverserFactory<C> traverserFactory) {
         if (function instanceof MapFunction)
             return flow.map(new MapFlow<>((MapFunction<C, S, E>) function));
@@ -114,6 +89,66 @@ public final class TopologyUtil {
                 sink = sink.mergeWith(branchFlow);
             }
             return sink;
+        } else if (function instanceof RepeatBranch) {
+            final RepeatBranch<C, S> repeatBranch = (RepeatBranch<C, S>) function;
+            final List<Publisher<Traverser<C, S>>> outputs = new ArrayList<>();
+            for (int i = 0; i < MAX_ITERATIONS; i++) {
+                Flowable<List> selectorFlow = flow.flatMapIterable(t -> {
+                    final List<List> list = new ArrayList<>();
+                    if (repeatBranch.hasStartPredicates()) {
+                        if (1 == repeatBranch.getUntilLocation()) {
+                            if (repeatBranch.getUntil().filterTraverser(t)) {
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            } else if (2 == repeatBranch.getEmitLocation() && repeatBranch.getEmit().filterTraverser(t)) {
+                                list.add(List.of(1, t));
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            } else
+                                list.add(List.of(1, t));
+                        } else if (1 == repeatBranch.getEmitLocation()) {
+                            if (repeatBranch.getEmit().filterTraverser(t))
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            if (2 == repeatBranch.getUntilLocation() && repeatBranch.getUntil().filterTraverser(t)) {
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            } else
+                                list.add(List.of(1, t));
+                        }
+                    } else
+                        list.add(List.of(1, t));
+                    return list;
+                });
+                outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+                flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)).publish(f -> compile(f, repeatBranch.getRepeat()));
+                selectorFlow = flow.flatMapIterable(t -> {
+                    final List<List> list = new ArrayList<>();
+                    if (repeatBranch.hasEndPredicates()) {
+                        if (3 == repeatBranch.getUntilLocation()) {
+                            if (repeatBranch.getUntil().filterTraverser(t)) {
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            } else if (4 == repeatBranch.getEmitLocation() && repeatBranch.getEmit().filterTraverser(t)) {
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                                list.add(List.of(1, t));
+                            } else
+                                list.add(List.of(1, t));
+                        } else if (3 == repeatBranch.getEmitLocation()) {
+                            if (repeatBranch.getEmit().filterTraverser(t))
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            if (4 == repeatBranch.getUntilLocation() && repeatBranch.getUntil().filterTraverser(t))
+                                list.add(List.of(0, t.repeatDone(repeatBranch)));
+                            else
+                                list.add(List.of(1, t));
+                        }
+                    } else
+                        list.add(List.of(1, t.repeatLoop(repeatBranch)));
+                    return list;
+                });
+                outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
+                flow = selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1));
+            }
+            Flowable<Traverser<C, S>> sink = flow.filter(t -> false); // branches are the only outputs
+            for (final Publisher<Traverser<C, S>> output : outputs) {
+                sink = sink.mergeWith(output);
+            }
+            return (Flowable) sink;
         }
         throw new RuntimeException("Need a new execution plan step: " + function);
     }
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
index 8085bbc..74f5b63 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaTest.java
@@ -27,9 +27,13 @@ import org.apache.tinkerpop.language.gremlin.common.__;
 import org.apache.tinkerpop.machine.Machine;
 import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
 import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.apache.tinkerpop.machine.species.remote.MachineServer;
+import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
+
 import static org.apache.tinkerpop.language.gremlin.common.__.incr;
 
 /**
@@ -45,24 +49,97 @@ public class RxJavaTest {
                 .withProcessor(RxJavaProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(),__.<Long>incr().incr());
+        Traversal<Long, ?, ?> traversal = g.inject(2L).is(P.gt(1)).union(incr(), __.<Long>incr().incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(1L).choose(__.is(1L), incr(),__.<Long>incr().incr());
+        traversal = g.inject(1L).choose(__.is(1L), incr(), __.<Long>incr().incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+
+        traversal = g.inject(1L).emit().repeat(incr()).until(__.is(3L));
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
+    }
+
+    @Test
+    public void shouldWork() {
+        final MachineServer server = new MachineServer(7777);
+        final Machine machine = RemoteMachine.open(6666, "localhost", 7777);
+        final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
+                .withCoefficient(LongCoefficient.class)
+                .withProcessor(RxJavaProcessor.class)
+                .withStrategy(IdentityStrategy.class);
 
-        /*traversal = g.inject(1L).until(__.is(P.lt(3L))).emit().repeat(incr());
+        Traversal<Long, ?, ?> traversal = g.inject(List.of(1L, 1L)).<Long>unfold().map(incr()).c(4L).repeat(incr()).until(__.is(__.constant(8L).incr().incr())).sum();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(List.of(1L, 2L)).unfold().is(P.lt(__.constant(2L))).groupCount().by(__.incr());
         System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(1L).times(10).repeat(__.incr()).emit();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        /*traversal = g.inject(1L).repeat(incr()).emit(__.constant(true)).until(__.<Long, Long>loops().is(P.gt(5)));
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");*/
+        traversal = g.inject(1L).emit(__.constant(true)).until(__.is(5L)).repeat(incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(1L).until(__.is(5L)).repeat(incr()).emit(__.constant(true));
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(7L).union(__.incr(), __.<Long>incr().incr().union(__.incr(), __.incr()));
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(7L).choose(__.is(7L), __.incr()).sum();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(List.of(7L, 8L, 9L)).<Long>unfold().choose(__.is(7L), __.incr(), __.<Long>incr().incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        traversal = g.inject(7L).as("a").union(__.<Long>incr().as("b"), __.<Long>incr().incr().as("b"), __.<Long>incr().incr().incr().as("b")).path("a", "b").by(__.incr());
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.toList());
+        System.out.println("\n----------\n");
+        /*traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
+        System.out.println(TraversalUtil.getBytecode(traversal));
+        System.out.println(traversal);
+        System.out.println(traversal.hasNext());
+        System.out.println(traversal.nextTraverser());
+        System.out.println(traversal.hasNext());
+        System.out.println(traversal.nextTraverser());
+        System.out.println(traversal.hasNext());*/
+        ///
+        g.close();
+        machine.close();
+        server.close();
     }
 }