You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/04 15:09:15 UTC

[tinkerpop] branch tp4 updated: figured out the hanging bug with RxJava. It doesn't like lots of merges and the repitition-based repeat model can hang. Dropped down the repititions for now, but will need to figure out how to do real looping in RxJava. Exposed more test cases.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 3ff208d  figured out the hanging bug with RxJava. It doesn't like lots of merges and the repitition-based repeat model can hang. Dropped down the repititions for now, but will need to figure out how to do real looping in RxJava. Exposed more test cases.
3ff208d is described below

commit 3ff208d72afa9660c856b8e0449f20e453967ac4
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Thu Apr 4 09:09:07 2019 -0600

    figured out the hanging bug with RxJava. It doesn't like lots of merges and the repitition-based repeat model can hang. Dropped down the repititions for now, but will need to figure out how to do real looping in RxJava. Exposed more test cases.
---
 .../org/apache/tinkerpop/machine/SimpleTestSuite.java   |  4 ++--
 .../tinkerpop/machine/processor/pipes/PipesTest.java    | 17 -----------------
 .../tinkerpop/machine/processor/rxjava/RxJava.java      | 11 ++++++-----
 3 files changed, 8 insertions(+), 24 deletions(-)

diff --git a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
index 4b88855..b53577b 100644
--- a/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
+++ b/java/machine/machine-test/src/main/java/org/apache/tinkerpop/machine/SimpleTestSuite.java
@@ -213,13 +213,13 @@ public class SimpleTestSuite extends AbstractTestSuite<Long> {
                 g.inject(1L, 2L, 3L).repeat(__.<Long>incr().flatMap(choose(is(lt(8L)), incr()))).times(3));
     }
 
-    //@Test
+    @Test
     void g_injectX1X_repeatXunionXincr__incr_incrXX_timesX1X() {
         verify(List.of(2L, 3L),
                 g.inject(1L).repeat(union(incr(), __.<Long>incr().incr())).times(1));
     }
 
-    //@Test
+    @Test
     void g_injectX1X_repeatXunionXincr__incr_incrXX_timesX2X() {
         verify(List.of(3L, 4L, 4L, 5L),
                 g.inject(1L).repeat(union(incr(), __.<Long>incr().incr())).times(2));
diff --git a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
index 1f0c37c..8d94250 100644
--- a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
+++ b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/PipesTest.java
@@ -24,7 +24,6 @@ import org.apache.tinkerpop.language.gremlin.TraversalSource;
 import org.apache.tinkerpop.language.gremlin.TraversalUtil;
 import org.apache.tinkerpop.language.gremlin.common.__;
 import org.apache.tinkerpop.machine.Machine;
-import org.apache.tinkerpop.machine.bytecode.compiler.Order;
 import org.apache.tinkerpop.machine.coefficient.LongCoefficient;
 import org.apache.tinkerpop.machine.species.LocalMachine;
 import org.apache.tinkerpop.machine.strategy.optimization.IdentityStrategy;
@@ -53,20 +52,4 @@ class PipesTest {
         System.out.println("\n----------\n");
     }
 
-    @Test
-    void testOrder() {
-        final Machine machine = LocalMachine.open();
-        final TraversalSource<Long> g = Gremlin.<Long>traversal(machine)
-                .withCoefficient(LongCoefficient.class)
-                .withProcessor(PipesProcessor.class)
-                .withStrategy(IdentityStrategy.class);
-
-        Traversal<Long, ?, ?> traversal = g.inject(7L, 3L, 5L, 20L, 1L, 2L).incr().order().by(Order.desc);
-        System.out.println(TraversalUtil.getBytecode(traversal));
-        System.out.println(traversal);
-        System.out.println(TraversalUtil.getBytecode(traversal));
-        System.out.println(traversal.toList());
-        System.out.println("\n----------\n");
-    }
-
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
index b7eac76..214004f 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJava.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public final class RxJava<C, S, E> implements Processor<C, S, E> {
 
-    private static final int MAX_REPETITIONS = 15; // TODO: this needs to be a dynamic configuration
+    private static final int MAX_REPETITIONS = 8; // TODO: this needs to be a dynamic configuration
 
     private final AtomicBoolean alive = new AtomicBoolean(Boolean.TRUE);
     private boolean executed = false;
@@ -127,12 +127,13 @@ public final class RxJava<C, S, E> implements Processor<C, S, E> {
         } else if (function instanceof BranchFunction) {
             final Flowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function));
             final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>();
+            int branchCounter = 0;
             for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches : ((BranchFunction<C, S, E>) function).getBranches().entrySet()) {
-                for (int i = 0; i < branches.getValue().size(); i++) {
-                    final Compilation<C, S, E> branch = branches.getValue().get(i);
-                    final int id = i;
+                final int branchId = null == branches.getKey() ? -1 : branchCounter;
+                branchCounter++;
+                for (final Compilation<C, S, E> branch : branches.getValue()) {
                     branchFlows.add(compile(selectorFlow.
-                                    filter(list -> list.get(0).equals(null == branches.getKey() ? -1 : id)).
+                                    filter(list -> list.get(0).equals(branchId)).
                                     map(list -> (Traverser<C, S>) list.get(1)),
                             branch));
                 }