You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2019/04/08 16:52:23 UTC
[tinkerpop] branch tp4 updated: added a benchmark test case in
rxJava to compare Parallel, serial, and Pipes.
This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new d35228c added a benchmark test case in rxJava to compare Parallel, serial, and Pipes.
d35228c is described below
commit d35228caec1fae2932ce615d2df278cbfea5ef61
Author: Marko A. Rodriguez <ok...@gmail.com>
AuthorDate: Mon Apr 8 10:52:12 2019 -0600
added a benchmark test case in rxJava to compare Parallel, serial, and Pipes.
---
.../language/gremlin/AbstractTraversal.java | 12 ++++
.../tinkerpop/language/gremlin/Traversal.java | 2 +
.../tinkerpop/machine/function/FilterFunction.java | 2 +
.../machine/function/FlatMapFunction.java | 2 +
.../tinkerpop/machine/function/MapFunction.java | 2 +
.../machine/function/filter/IdentityFilter.java | 7 +-
.../machine/function/flatmap/UnfoldFlatMap.java | 8 ++-
.../machine/function/map/ConstantMap.java | 6 ++
.../tinkerpop/machine/function/map/IncrMap.java | 7 +-
.../tinkerpop/machine/function/map/LoopsMap.java | 7 +-
.../machine/processor/beam/SimpleRemoteTest.java | 7 +-
java/machine/processor/rxjava/pom.xml | 6 ++
.../machine/processor/rxjava/FilterFlow.java | 8 +--
.../machine/processor/rxjava/FlatMapFlow.java | 8 +--
.../machine/processor/rxjava/MapFlow.java | 8 +--
.../machine/processor/rxjava/RxJavaBenchmark.java | 78 ++++++++++++++++++++++
16 files changed, 149 insertions(+), 21 deletions(-)
diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
index 3d2811c..f65279d 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/AbstractTraversal.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -102,6 +103,17 @@ public abstract class AbstractTraversal<C, S, E> implements Traversal<C, S, E> {
}
@Override
+ public void iterate() {
+ try {
+ while (true) {
+ this.nextTraverser();
+ }
+ } catch (final NoSuchElementException e) {
+ // do nothing
+ }
+ }
+
+ @Override
public String toString() {
return this.bytecode.toString();
}
diff --git a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
index 98b5d9a..f0fa839 100644
--- a/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
+++ b/java/language/gremlin/src/main/java/org/apache/tinkerpop/language/gremlin/Traversal.java
@@ -119,4 +119,6 @@ public interface Traversal<C, S, E> extends Iterator<E> {
public Traverser<C, E> nextTraverser();
+ public void iterate();
+
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java
index a8bcb7b..cd0653e 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FilterFunction.java
@@ -26,4 +26,6 @@ import java.util.function.Predicate;
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface FilterFunction<C, S> extends Predicate<Traverser<C, S>>, CFunction<C> {
+
+ public FilterFunction<C, S> clone();
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java
index 573af55..4a2aa8e 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/FlatMapFunction.java
@@ -27,4 +27,6 @@ import java.util.function.Function;
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface FlatMapFunction<C, S, E> extends Function<Traverser<C, S>, Iterator<E>>, CFunction<C> {
+
+ public FlatMapFunction<C, S, E> clone();
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java
index e18738a..6d4d254 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/MapFunction.java
@@ -27,4 +27,6 @@ import java.util.function.Function;
*/
public interface MapFunction<C, S, E> extends Function<Traverser<C, S>, E>, CFunction<C> {
+ public MapFunction<C, S, E> clone();
+
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
index f71c14c..07639e1 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/filter/IdentityFilter.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.function.AbstractFunction;
import org.apache.tinkerpop.machine.function.FilterFunction;
import org.apache.tinkerpop.machine.traverser.Traverser;
-import java.util.Set;
-
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -40,6 +38,11 @@ public final class IdentityFilter<C, S> extends AbstractFunction<C> implements F
return true;
}
+ @Override
+ public IdentityFilter<C, S> clone() {
+ return (IdentityFilter<C, S>) super.clone();
+ }
+
public static <C, S> IdentityFilter<C, S> compile(final Instruction<C> instruction) {
return new IdentityFilter<>(instruction.coefficient(), instruction.label());
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
index cb44d61..3d34bb2 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/flatmap/UnfoldFlatMap.java
@@ -29,14 +29,13 @@ import org.apache.tinkerpop.machine.util.IteratorUtils;
import java.lang.reflect.Array;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements FlatMapFunction<C, S, E> {
- public UnfoldFlatMap(final Coefficient<C> coefficient, final String label) {
+ private UnfoldFlatMap(final Coefficient<C> coefficient, final String label) {
super(coefficient, label);
}
@@ -55,6 +54,11 @@ public final class UnfoldFlatMap<C, S, E> extends AbstractFunction<C> implements
return IteratorUtils.of((E) object);
}
+ @Override
+ public UnfoldFlatMap<C, S, E> clone() {
+ return (UnfoldFlatMap<C, S, E>) super.clone();
+ }
+
private final Iterator<E> handleArrays(final Object array) {
if (array instanceof Object[]) {
return new ArrayIterator<>((E[]) array);
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java
index 2abe342..ba462b2 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/ConstantMap.java
@@ -52,7 +52,13 @@ public final class ConstantMap<C, S, E> extends AbstractFunction<C> implements M
return StringFactory.makeFunctionString(this, this.constant);
}
+ @Override
+ public ConstantMap<C, S, E> clone() {
+ return (ConstantMap<C, S, E>) super.clone();
+ }
+
public static <C, S, E> ConstantMap<C, S, E> compile(final Instruction<C> instruction) {
return new ConstantMap<>(instruction.coefficient(), instruction.label(), (E) instruction.args()[0]);
}
+
}
\ No newline at end of file
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java
index e4a9ad5..735206c 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/IncrMap.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.function.AbstractFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.traverser.Traverser;
-import java.util.Set;
-
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -40,6 +38,11 @@ public final class IncrMap<C> extends AbstractFunction<C> implements MapFunction
return traverser.object() + 1L;
}
+ @Override
+ public IncrMap<C> clone() {
+ return (IncrMap<C>) super.clone();
+ }
+
public static <C> IncrMap<C> compile(final Instruction<C> instruction) {
return new IncrMap<>(instruction.coefficient(), instruction.label());
}
diff --git a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java
index f7f04e8..2e332fe 100644
--- a/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java
+++ b/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/function/map/LoopsMap.java
@@ -24,8 +24,6 @@ import org.apache.tinkerpop.machine.function.AbstractFunction;
import org.apache.tinkerpop.machine.function.MapFunction;
import org.apache.tinkerpop.machine.traverser.Traverser;
-import java.util.Set;
-
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -40,6 +38,11 @@ public final class LoopsMap<C, S> extends AbstractFunction<C> implements MapFunc
return traverser.loops();
}
+ @Override
+ public LoopsMap<C, S> clone() {
+ return (LoopsMap<C, S>) super.clone();
+ }
+
public static <C, S> LoopsMap<C, S> compile(final Instruction<C> instruction) {
return new LoopsMap<>(instruction.coefficient(), instruction.label());
}
diff --git a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
index 322c8e9..b65460f 100644
--- a/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
+++ b/java/machine/processor/beam/src/test/java/org/apache/tinkerpop/machine/processor/beam/SimpleRemoteTest.java
@@ -25,6 +25,8 @@ import org.apache.tinkerpop.machine.species.remote.MachineServer;
import org.apache.tinkerpop.machine.species.remote.RemoteMachine;
import org.junit.jupiter.api.AfterAll;
+import java.util.Map;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -34,7 +36,10 @@ public class SimpleRemoteTest extends SimpleTestSuite {
private static MachineServer SERVER = new MachineServer(7777);
static {
- BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR, BeamProcessor.class);
+ BYTECODE.addSourceInstruction(CoreCompiler.Symbols.WITH_PROCESSOR,
+ BeamProcessor.class,
+ Map.of(BeamProcessor.TRAVERSER_SERVER_LOCATION,
+ "localhost", BeamProcessor.TRAVERSER_SERVER_PORT, 6666));
}
SimpleRemoteTest() {
diff --git a/java/machine/processor/rxjava/pom.xml b/java/machine/processor/rxjava/pom.xml
index 1c5c276..c699b30 100644
--- a/java/machine/processor/rxjava/pom.xml
+++ b/java/machine/processor/rxjava/pom.xml
@@ -41,6 +41,12 @@ limitations under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>pipes</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<directory>${basedir}/target</directory>
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
index ec686f7..fe02ffe 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FilterFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
*/
final class FilterFlow<C, S> implements Predicate<Traverser<C, S>> {
- private final ThreadLocal<FilterFunction<C, S>> function;
+ private final ThreadLocal<FilterFunction<C, S>> filterFunction;
- FilterFlow(final FilterFunction<C, S> function) {
- this.function = ThreadLocal.withInitial(() -> (FilterFunction) function.clone());
+ FilterFlow(final FilterFunction<C, S> filterFunction) {
+ this.filterFunction = ThreadLocal.withInitial(filterFunction::clone);
}
@Override
public boolean test(final Traverser<C, S> traverser) {
- return this.function.get().test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct
+ return this.filterFunction.get().test(traverser); // todo: make this 0/1-flatmap so traverser splitting is correct
}
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
index 2d1cd9b..aef4238 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/FlatMapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
*/
final class FlatMapFlow<C, S, E> implements Function<Traverser<C, S>, Iterable<Traverser<C, E>>> {
- private ThreadLocal<FlatMapFunction<C, S, E>> function;
+ private ThreadLocal<FlatMapFunction<C, S, E>> flatMapFunction;
- FlatMapFlow(final FlatMapFunction<C, S, E> function) {
- this.function = ThreadLocal.withInitial(() -> (FlatMapFunction) function.clone());
+ FlatMapFlow(final FlatMapFunction<C, S, E> flatMapFunction) {
+ this.flatMapFunction = ThreadLocal.withInitial(flatMapFunction::clone);
}
@Override
public Iterable<Traverser<C, E>> apply(final Traverser<C, S> traverser) {
- return () -> traverser.flatMap(this.function.get());
+ return () -> traverser.flatMap(this.flatMapFunction.get());
}
}
diff --git a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
index 57e934e..33e2659 100644
--- a/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
+++ b/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
@@ -27,14 +27,14 @@ import org.apache.tinkerpop.machine.traverser.Traverser;
*/
final class MapFlow<C, S, E> implements Function<Traverser<C, S>, Traverser<C, E>> {
- private final ThreadLocal<MapFunction<C, S, E>> function;
+ private final ThreadLocal<MapFunction<C, S, E>> mapFunction;
- MapFlow(final MapFunction<C, S, E> function) {
- this.function = ThreadLocal.withInitial(() -> (MapFunction)function.clone());
+ MapFlow(final MapFunction<C, S, E> mapFunction) {
+ this.mapFunction = ThreadLocal.withInitial(mapFunction::clone);
}
@Override
public Traverser<C, E> apply(final Traverser<C, S> traverser) {
- return traverser.map(this.function.get());
+ return traverser.map(this.mapFunction.get());
}
}
diff --git a/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
new file mode 100644
index 0000000..1051c47
--- /dev/null
+++ b/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.machine.processor.rxjava;
+
+import org.apache.tinkerpop.language.gremlin.Gremlin;
+import org.apache.tinkerpop.language.gremlin.TraversalSource;
+import org.apache.tinkerpop.language.gremlin.common.__;
+import org.apache.tinkerpop.machine.Machine;
+import org.apache.tinkerpop.machine.processor.pipes.PipesProcessor;
+import org.apache.tinkerpop.machine.species.LocalMachine;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+class RxJavaBenchmark {
+
+ @Test
+ public void benchmark() {
+ final Machine machine = LocalMachine.open();
+ final TraversalSource ser = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class);
+ final TraversalSource par = Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class, Map.of(RxJavaProcessor.RXJAVA_THREADS, Runtime.getRuntime().availableProcessors() - 1));
+ final TraversalSource pipes = Gremlin.traversal(machine).withProcessor(PipesProcessor.class);
+ final List<Long> input = new ArrayList<>(1000);
+ for (long i = 0; i < 100000; i++) {
+ input.add(i+1);
+ }
+ final int runs = 30;
+ System.out.println("Threads used: " + (Runtime.getRuntime().availableProcessors() - 1));
+ System.out.println("Input size: " + input.size());
+ float serTime = 0.0f;
+ float parTime = 0.0f;
+ float pipTime = 0.0f;
+ for (int i = 0; i < runs; i++) {
+ String source = "ser";
+ for (final TraversalSource g : List.of(ser, par, pipes)) {
+ final long time = System.currentTimeMillis();
+ g.inject(input).unfold().repeat(__.incr()).times(4).iterate();
+ if (i > 1) {
+ if ("ser".equals(source)) {
+ serTime = serTime + (System.currentTimeMillis() - time);
+ source = "par";
+ } else if ("par".equals(source)) {
+ parTime = parTime + (System.currentTimeMillis() - time);
+ source = "pip";
+ } else {
+ pipTime = pipTime + (System.currentTimeMillis() - time);
+ source = "ser";
+ }
+ }
+
+ }
+ }
+ System.out.println("Average time [seri]: " + serTime / runs);
+ System.out.println("Average time [para]: " + parTime / runs);
+ System.out.println("Average time [pipe]: " + pipTime / runs);
+ }
+}