You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/08 16:52:23 UTC

[tinkerpop] branch tp4 updated: added a benchmark test case in rxJava to compare Parallel, serial, and Pipes.

This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new d35228c  added a benchmark test case in rxJava to compare Parallel, serial, and Pipes.
d35228c is described below

commit d35228caec1fae2932ce615d2df278cbfea5ef61
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Mon Apr 8 10:52:12 2019 -0600

    added a benchmark test case in rxJava to compare Parallel, serial, and Pipes.
---
 .../language/gremlin/AbstractTraversal.java        | 12 ++++
 .../tinkerpop/language/gremlin/Traversal.java      |  2 +
 .../tinkerpop/machine/function/FilterFunction.java |  2 +
 .../machine/function/FlatMapFunction.java          |  2 +
 .../tinkerpop/machine/function/MapFunction.java    |  2 +
 .../machine/function/filter/IdentityFilter.java    |  7 +-
 .../machine/function/flatmap/UnfoldFlatMap.java    |  8 ++-
 .../machine/function/map/ConstantMap.java          |  6 ++
 .../tinkerpop/machine/function/map/IncrMap.java    |  7 +-
 .../tinkerpop/machine/function/map/LoopsMap.java   |  7 +-
 .../machine/processor/beam/SimpleRemoteTest.java   |  7 +-
 java/machine/processor/rxjava/pom.xml              |  6 ++
 .../machine/processor/rxjava/FilterFlow.java       |  8 +--
 .../machine/processor/rxjava/FlatMapFlow.java      |  8 +--
 .../machine/processor/rxjava/MapFlow.java          |  8 +--
 .../machine/processor/rxjava/RxJavaBenchmark.java  | 78 ++++++++++++++++++++++
 16 files changed, 149 insertions(+), 21 deletions(-)

diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
index 3d2811c..f65279d 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -102,6 +103,17 @@ public abstract class AbstractTraversal<C, S, E> implements Traversal<C, S, E> {
     }
 
     @Override
+    public void iterate() {
+        try {
+            while (true) {
+                this.nextTraverser();
+            }
+        } catch (final NoSuchElementException e) {
+            // do nothing
+        }
+    }
+
+    @Override
     public String toString() {
         return this.bytecode.toString();
     }
diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
index 98b5d9a..f0fa839 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
@@ -119,4 +119,6 @@ public interface Traversal<C, S, E> extends Iterator<E> {
 
     public Traverser<C, E> nextTraverser();
 
+    public void iterate();
+
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java
index a8bcb7b..cd0653e 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java
@@ -26,4 +26,6 @@ import java.util.function.Predicate;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public interface FilterFunction<C, S> extends Predicate<Traverser<C, S>>, CFunction<C> {
+
+    public FilterFunction<C, S> clone();
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java
index 573af55..4a2aa8e 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java
@@ -27,4 +27,6 @@ import java.util.function.Function;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public interface FlatMapFunction<C, S, E> extends Function<Traverser<C, S>, Iterator<E>>, CFunction<C> {
+
+    public FlatMapFunction<C, S, E> clone();
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java
index e18738a..6d4d254 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java
@@ -27,4 +27,6 @@ import java.util.function.Function;
  */
 public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, E>, CFunction<C> {
 
+    public MapFunction<C, S, E> clone();
+
 }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
index f71c14c..07639e1 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.FilterFunction;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
-import java.util.Set;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -40,6 +38,11 @@ public final class IdentityFilter<C, S> extends AbstractFunction<C> implements F
         return true;
     }
 
+    @Override
+    public IdentityFilter<C, S> clone() {
+        return (IdentityFilter<C, S>) super.clone();
+    }
+
     public static <C, S> IdentityFilter<C, S> compile(final Instruction<C> instruction) {
         return new IdentityFilter<>(instruction.coefficient(), instruction.label());
     }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
index cb44d61..3d34bb2 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
@@ -29,14 +29,13 @@ import org.apache.tinkerpop.machine.util.IteratorUtils;
 import java.lang.reflect.Array;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements FlatMapFunction<C, S, E> {
 
-    public UnfoldFlatMap(final Coefficient<C> coefficient, final String label) {
+    private UnfoldFlatMap(final Coefficient<C> coefficient, final String label) {
         super(coefficient, label);
     }
 
@@ -55,6 +54,11 @@ public final class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements
             return IteratorUtils.of((E) object);
     }
 
+    @Override
+    public UnfoldFlatMap<C, S, E> clone() {
+        return (UnfoldFlatMap<C, S, E>) super.clone();
+    }
+
     private final Iterator<E> handleArrays(final Object array) {
         if (array instanceof Object[]) {
             return new ArrayIterator<>((E[]) array);
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java
index 2abe342..ba462b2 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java
@@ -52,7 +52,13 @@ public final class ConstantMap<C, S, E> extends AbstractFunction<C> implements M
         return StringFactory.makeFunctionString(this, this.constant);
     }
 
+    @Override
+    public ConstantMap<C, S, E> clone() {
+        return (ConstantMap<C, S, E>) super.clone();
+    }
+
     public static <C, S, E> ConstantMap<C, S, E> compile(final Instruction<C> instruction) {
         return new ConstantMap<>(instruction.coefficient(), instruction.label(), (E) instruction.args()[0]);
     }
+
 }
\ No newline at end of file
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java
index e4a9ad5..735206c 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
-import java.util.Set;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -40,6 +38,11 @@ public final class IncrMap<C> extends AbstractFunction<C> implements MapFunction
         return traverser.object() + 1L;
     }
 
+    @Override
+    public IncrMap<C> clone() {
+        return (IncrMap<C>) super.clone();
+    }
+
     public static <C> IncrMap<C> compile(final Instruction<C> instruction) {
         return new IncrMap<>(instruction.coefficient(), instruction.label());
     }
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java
index f7f04e8..2e332fe 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
-import java.util.Set;
-
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -40,6 +38,11 @@ public final class LoopsMap<C, S> extends AbstractFunction<C> implements MapFunc
         return traverser.loops();
     }
 
+    @Override
+    public LoopsMap<C, S> clone() {
+        return (LoopsMap<C, S>) super.clone();
+    }
+
     public static <C, S> LoopsMap<C, S> compile(final Instruction<C> instruction) {
         return new LoopsMap<>(instruction.coefficient(), instruction.label());
     }
diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
index 322c8e9..b65460f 100644
--- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
+++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
@@ -25,6 +25,8 @@ import org.apache.tinkerpop.machine.species.remote.MachineServer;
 import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
 import org.junit.jupiter.api.AfterAll;
 
+import java.util.Map;
+
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
@@ -34,7 +36,10 @@ public class SimpleRemoteTest extends SimpleTestSuite {
     private static MachineServer SERVER = new MachineServer(7777);
 
     static {
-        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, BeamProcessor.class);
+        BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR,
+                BeamProcessor.class,
+                Map.of(BeamProcessor.TRAVERSER_SERVER_LOCATION,
+                        "localhost", BeamProcessor.TRAVERSER_SERVER_PORT, 6666));
     }
 
     SimpleRemoteTest() {
diff --git a/java/machine/processor/rxjava/pom.xml b/java/machine/processor/rxjava/pom.xml
index 1c5c276..c699b30 100644
--- a/java/machine/processor/rxjava/pom.xml
+++ b/java/machine/processor/rxjava/pom.xml
@@ -41,6 +41,12 @@ limitations under the License.
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.tinkerpop</groupId>
+            <artifactId>pipes</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <directory>${basedir}/target</directory>
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
index ec686f7..fe02ffe 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class FilterFlow<C, S> implements Predicate<Traverser<C, S>> {
 
-    private final ThreadLocal<FilterFunction<C, S>> function;
+    private final ThreadLocal<FilterFunction<C, S>> filterFunction;
 
-    FilterFlow(final FilterFunction<C, S> function) {
-        this.function = ThreadLocal.withInitial(() -> (FilterFunction) function.clone());
+    FilterFlow(final FilterFunction<C, S> filterFunction) {
+        this.filterFunction = ThreadLocal.withInitial(filterFunction::clone);
     }
 
     @Override
     public boolean test(final Traverser<C, S> traverser) {
-        return this.function.get().test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct
+        return this.filterFunction.get().test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct
     }
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
index 2d1cd9b..aef4238 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> {
 
-    private ThreadLocal<FlatMapFunction<C, S, E>> function;
+    private ThreadLocal<FlatMapFunction<C, S, E>> flatMapFunction;
 
-    FlatMapFlow(final FlatMapFunction<C, S, E> function) {
-        this.function = ThreadLocal.withInitial(() -> (FlatMapFunction) function.clone());
+    FlatMapFlow(final FlatMapFunction<C, S, E> flatMapFunction) {
+        this.flatMapFunction = ThreadLocal.withInitial(flatMapFunction::clone);
     }
 
     @Override
     public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
-        return () -> traverser.flatMap(this.function.get());
+        return () -> traverser.flatMap(this.flatMapFunction.get());
     }
 }
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
index 57e934e..33e2659 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
  */
 final class MapFlow<C, S, E> implements Function<Traverser<C, S>, Traverser<C, E>> {
 
-    private final ThreadLocal<MapFunction<C, S, E>> function;
+    private final ThreadLocal<MapFunction<C, S, E>> mapFunction;
 
-    MapFlow(final MapFunction<C, S, E> function) {
-        this.function = ThreadLocal.withInitial(() -> (MapFunction)function.clone());
+    MapFlow(final MapFunction<C, S, E> mapFunction) {
+        this.mapFunction = ThreadLocal.withInitial(mapFunction::clone);
     }
 
     @Override
     public Traverser<C, E> apply(final Traverser<C, S> traverser) {
-        return traverser.map(this.function.get());
+        return traverser.map(this.mapFunction.get());
     }
 }
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
new file mode 100644
index 0000000..1051c47
--- /dev/null
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.language.gremlin.Gremlin;
+import org.apache.tinkerpop.language.gremlin.TraversalSource;
+import org.apache.tinkerpop.language.gremlin.common.__;
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.processor.pipes.PipesProcessor;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+class RxJavaBenchmark {
+
+    @Test
+    public void benchmark() {
+        final Machine machine = LocalMachine.open();
+        final TraversalSource ser = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class);
+        final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors() - 1));
+        final TraversalSource pipes = Gremlin.traversal(machine).withProcessor(PipesProcessor.class);
+        final List<Long> input = new ArrayList<>(1000);
+        for (long i = 0; i < 100000; i++) {
+            input.add(i+1);
+        }
+        final int runs = 30;
+        System.out.println("Threads used: " + (Runtime.getRuntime().availableProcessors() - 1));
+        System.out.println("Input size: " + input.size());
+        float serTime = 0.0f;
+        float parTime = 0.0f;
+        float pipTime = 0.0f;
+        for (int i = 0; i < runs; i++) {
+            String source = "ser";
+            for (final TraversalSource g : List.of(ser, par, pipes)) {
+                final long time = System.currentTimeMillis();
+                g.inject(input).unfold().repeat(__.incr()).times(4).iterate();
+                if (i > 1) {
+                    if ("ser".equals(source)) {
+                        serTime = serTime + (System.currentTimeMillis() - time);
+                        source = "par";
+                    } else if ("par".equals(source)) {
+                        parTime = parTime + (System.currentTimeMillis() - time);
+                        source = "pip";
+                    } else {
+                        pipTime = pipTime + (System.currentTimeMillis() - time);
+                        source = "ser";
+                    }
+                }
+
+            }
+        }
+        System.out.println("Average time [seri]: " + serTime / runs);
+        System.out.println("Average time [para]: " + parTime / runs);
+        System.out.println("Average time [pipe]: " + pipTime / runs);
+    }
+}