You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/09 02:07:39 UTC

[tinkerpop] branch tp4 updated: added a Thread.sleep() in the test suite to make sure the servers are shutdown fully between tests. Learned about share() for branching in Flowable. Added it to SerialRxJava.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 3b452af  added a Thread.sleep() in the test suite to make sure the servers are shutdown fully between tests. Learned about share() for branching in Flowable. Added it to SerialRxJava.
3b452af is described below

commit 3b452af20116bdfb61252d5e8ea75bf3b40c35ca
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Mon Apr 8 20:07:31 2019 -0600

    added a Thread.sleep() in the test suite to make sure the servers are shutdown fully between tests. Learned about share() for branching in Flowable. Added it to SerialRxJava.
---
 .../apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java    | 5 +++++
 .../apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java   | 5 +++++
 .../org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java  | 4 ++--
 .../tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java   | 5 +++++
 4 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
index b65460f..5b3448a 100644
--- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
+++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
@@ -49,6 +49,11 @@ public class SimpleRemoteTest extends SimpleTestSuite {
     @AfterAll
     static void stopServer() {
         SERVER.close();
+        try {
+            Thread.sleep(10);
+        } catch (final InterruptedException e) {
+
+        }
     }
 
 }
\ No newline at end of file
diff --git a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
index 63c79af..5e0b32b 100644
--- a/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
+++ b/java/machine/processor/pipes/src/test/java/org/apache/tinkerpop/machine/processor/pipes/SimpleRemoteTest.java
@@ -44,6 +44,11 @@ class SimpleRemoteTest extends SimpleTestSuite {
     @AfterAll
     static void stopServer() {
         SERVER.close();
+        try {
+            Thread.sleep(10);
+        } catch (final InterruptedException e) {
+
+        }
     }
 
 }
\ No newline at end of file
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
index 5331ee3..ace99b4 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
@@ -90,7 +90,7 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
             final BarrierFunction<C, S, E, B> barrierFunction = (BarrierFunction<C, S, E, B>) function;
             return flow.reduce(barrierFunction.getInitialValue(), new Barrier<>(barrierFunction)).toFlowable().flatMapIterable(new BarrierFlow<>(barrierFunction, traverserFactory));
         } else if (function instanceof BranchFunction) {
-            final Flowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function));
+            final Flowable<List> selectorFlow = flow.map(new BranchFlow<>((BranchFunction<C, S, B>) function)).share();
             final List<Publisher<Traverser<C, E>>> branchFlows = new ArrayList<>();
             int branchCounter = 0;
             for (final Map.Entry<Compilation<C, S, ?>, List<Compilation<C, S, E>>> branches : ((BranchFunction<C, S, E>) function).getBranches().entrySet()) {
@@ -110,7 +110,7 @@ public final class SerialRxJava<C, S, E> extends AbstractRxJava<C, S, E> {
             Flowable<List> selectorFlow;
             for (int i = 0; i < MAX_REPETITIONS; i++) {
                 if (repeatBranch.hasStartPredicates()) {
-                    selectorFlow = flow.flatMapIterable(new RepeatStart<>(repeatBranch));
+                    selectorFlow = flow.flatMapIterable(new RepeatStart<>(repeatBranch)).share();
                     outputs.add(selectorFlow.filter(list -> list.get(0).equals(0)).map(list -> (Traverser<C, S>) list.get(1)));
                     flow = compile(selectorFlow.filter(list -> list.get(0).equals(1)).map(list -> (Traverser<C, S>) list.get(1)), repeatBranch.getRepeat());
                 } else
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
index a8d1e83..d6ba73f 100644
--- a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/SimpleRemoteSerialTest.java
@@ -44,6 +44,11 @@ public class SimpleRemoteSerialTest extends SimpleTestSuite {
     @AfterAll
     static void stopServer() {
         SERVER.close();
+        try {
+            Thread.sleep(10);
+        } catch (final InterruptedException e) {
+
+        }
     }
 
 }
\ No newline at end of file