You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/28 21:44:27 UTC
incubator-tinkerpop git commit: large scale benchmarking of
SparkGraphComputer with the Friendster dataset identified a combiner() issue.
I wasnt combining into sufficiently small chunks per partition. This was
extremely wasteful and on our large scale b
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 93b112d40 -> 318eb34be
large scale benchmarking of SparkGraphComputer with the Friendster dataset identified a combiner() issue. I wasnt combining into sufficiently small chunks per partition. This was extremely wasteful and on our large scale benchmarking, led to GC stalls in MapReduce phases. I wrote a really solid test suite for CombinerIterator that demonstrates proper aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/318eb34b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/318eb34b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/318eb34b
Branch: refs/heads/master
Commit: 318eb34be68a441fa2df0ec17b56eb3acef3be7d
Parents: 93b112d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 28 13:44:20 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 28 13:44:20 2016 -0700
----------------------------------------------------------------------
.../gremlin/process/computer/MapReduce.java | 13 +-
.../spark/process/computer/CombineIterator.java | 32 +--
.../process/computer/CombineIteratorTest.java | 193 +++++++++++++++++++
3 files changed, 219 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/318eb34b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
index a75c4c8..3e1eae6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
@@ -271,7 +271,7 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
/**
* A convenience singleton when a single key is needed so that all emitted values converge to the same combiner/reducer.
*/
- public static class NullObject implements Comparable, Serializable {
+ public static class NullObject implements Comparable<NullObject>, Serializable {
private static final NullObject INSTANCE = new NullObject();
private static final String NULL_OBJECT = "";
@@ -281,20 +281,17 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
@Override
public int hashCode() {
- return 0;
+ return -9832049;
}
@Override
public boolean equals(final Object object) {
- return object instanceof NullObject;
+ return this == object || object instanceof NullObject;
}
@Override
- public int compareTo(final Object object) {
- if (object instanceof NullObject)
- return 0;
- else
- throw new IllegalArgumentException("The " + NullObject.class.getSimpleName() + " can not be compared with " + object.getClass().getSimpleName());
+ public int compareTo(final NullObject object) {
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/318eb34b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
index 9ca5bce..b6c544f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
@@ -38,6 +38,7 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
private final MapReduce<K, V, OK, OV, ?> mapReduce;
private final CombineIteratorEmitter combineIteratorEmitter = new CombineIteratorEmitter();
private final Map<K, List<V>> combineMap = new ConcurrentHashMap<>();
+ private boolean combined = true;
public CombineIterator(final MapReduce<K, V, OK, OV, ?> mapReduce, final Iterator<Tuple2<K, V>> inputIterator) {
this.inputIterator = inputIterator;
@@ -74,8 +75,8 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
private static final int MAX_SIZE = 5000;
private void processNext() {
- int sizeCounter = this.combineMap.size();
- while (sizeCounter < MAX_SIZE && this.inputIterator.hasNext()) {
+ int combinedSize = this.combineMap.size();
+ while (combinedSize < MAX_SIZE && this.inputIterator.hasNext()) {
final Tuple2<K, V> keyValue = this.inputIterator.next();
List<V> values = this.combineMap.get(keyValue._1());
if (null == values) {
@@ -83,20 +84,30 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
this.combineMap.put(keyValue._1(), values);
}
values.add(keyValue._2());
- if (++sizeCounter > MAX_SIZE) {
- for (final K key : this.combineMap.keySet()) {
- final List<V> values2 = this.combineMap.get(key);
- if (values2.size() > 1) {
- this.combineMap.remove(key);
- this.mapReduce.combine(key, values2.iterator(), this.combineIteratorEmitter);
- }
+ combinedSize++;
+ this.combined = false;
+ if (combinedSize >= MAX_SIZE) {
+ this.doCombine();
+ combinedSize = this.combineMap.size();
+ }
+ }
+ }
+
+ private void doCombine() {
+ if (!this.combined) {
+ for (final K key : this.combineMap.keySet()) {
+ final List<V> values2 = this.combineMap.get(key);
+ if (values2.size() > 1) {
+ this.combineMap.remove(key);
+ this.mapReduce.combine(key, values2.iterator(), this.combineIteratorEmitter);
}
- sizeCounter = this.combineMap.size();
}
+ this.combined = true;
}
}
private Tuple2<OK, OV> nextFromCombineMap() {
+ this.doCombine();
final OK key = (OK) this.combineMap.keySet().iterator().next();
final List<OV> values = (List<OV>) this.combineMap.get(key);
final Tuple2<OK, OV> keyValue = new Tuple2<>(key, values.remove(0));
@@ -106,7 +117,6 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
}
private class CombineIteratorEmitter implements MapReduce.ReduceEmitter<OK, OV> {
-
@Override
public void emit(final OK key, OV value) {
List<V> values = combineMap.get(key);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/318eb34b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java
new file mode 100644
index 0000000..e5458b0
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class CombineIteratorTest {
+
+ @Test
+ public void shouldBulkResults() {
+ long total = 0;
+ final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+ for (long i = 0; i < 100; i++) {
+ total = total + i;
+ numbers.add(new Tuple2<>("test", i));
+ }
+
+ final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+ final Tuple2<String, Long> tuple = combineIterator.next();
+ assertEquals("test", tuple._1());
+ assertEquals(Long.valueOf(total), tuple._2());
+ assertFalse(combineIterator.hasNext());
+ }
+
+ @Test
+ public void shouldDoubleBulkResults() {
+ long total = 0;
+ final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+ for (long i = 0; i < 9000; i++) {
+ total = total + i;
+ numbers.add(new Tuple2<>("test", i));
+ }
+
+ final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+ assertTrue(combineIterator.hasNext());
+ final Tuple2<String, Long> tuple = combineIterator.next();
+ assertFalse(combineIterator.hasNext());
+ assertEquals("test", tuple._1());
+ assertEquals(total, tuple._2().longValue());
+ }
+
+ @Test
+ public void shouldTripleBulkResults() {
+ long total = 0;
+ final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+ for (long i = 0; i < 14000; i++) {
+ total = total + i;
+ numbers.add(new Tuple2<>("test", i));
+ }
+
+ final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+ assertTrue(combineIterator.hasNext());
+ final Tuple2<String, Long> tuple = combineIterator.next();
+ assertFalse(combineIterator.hasNext());
+ assertEquals("test", tuple._1());
+ assertEquals(total, tuple._2().longValue());
+ }
+
+ @Test
+ public void shouldEndlessBulkResults() {
+ long total = 0;
+ final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+ for (long i = 0; i < 5000000; i++) {
+ total = total + i;
+ numbers.add(new Tuple2<>("test", i));
+ }
+
+ final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+ assertTrue(combineIterator.hasNext());
+ final Tuple2<String, Long> tuple = combineIterator.next();
+ assertFalse(combineIterator.hasNext());
+ assertEquals("test", tuple._1());
+ assertEquals(total, tuple._2().longValue());
+ }
+
+ @Test
+ public void shouldEndlessBulkResultsWithNullObject() {
+ long total = 0;
+ final List<Tuple2<MapReduce.NullObject, Long>> numbers = new ArrayList<>();
+ for (long i = 0; i < 5000000; i++) {
+ total = total + i;
+ numbers.add(new Tuple2<>(MapReduce.NullObject.instance(), i));
+ }
+
+ final CombineIterator<MapReduce.NullObject, Long, MapReduce.NullObject, Long> combineIterator = new CombineIterator<>(new MapReduceB(), numbers.iterator());
+ assertTrue(combineIterator.hasNext());
+ final Tuple2<MapReduce.NullObject, Long> tuple = combineIterator.next();
+ assertFalse(combineIterator.hasNext());
+ assertEquals(MapReduce.NullObject.instance(), tuple._1());
+ assertEquals(total, tuple._2().longValue());
+ }
+
+ @Test
+ public void shouldBulkResultsByKey() {
+ long total = 0;
+ final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+ for (long i = 0; i < 9000; i++) {
+ total = total + i;
+ numbers.add(new Tuple2<>(UUID.randomUUID().toString(), i));
+ }
+
+ final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+ assertEquals(9000, IteratorUtils.count(combineIterator));
+ }
+
+ private static class MapReduceA extends StaticMapReduce<String, Long, String, Long, Long> {
+
+ @Override
+ public void combine(final String key, final Iterator<Long> values, final ReduceEmitter<String, Long> emitter) {
+ long counter = 0;
+ while (values.hasNext()) {
+ counter = counter + values.next();
+ }
+ emitter.emit(key, counter);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return "test";
+ }
+
+ @Override
+ public Long generateFinalResult(final Iterator<KeyValue<String, Long>> keyValues) {
+ return keyValues.next().getValue();
+ }
+ }
+
+ private static class MapReduceB extends StaticMapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> {
+
+ @Override
+ public void combine(final MapReduce.NullObject key, final Iterator<Long> values, final ReduceEmitter<MapReduce.NullObject, Long> emitter) {
+ long counter = 0;
+ while (values.hasNext()) {
+ counter = counter + values.next();
+ }
+ emitter.emit(key, counter);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return "test";
+ }
+
+ @Override
+ public Long generateFinalResult(final Iterator<KeyValue<MapReduce.NullObject, Long>> keyValues) {
+ return keyValues.next().getValue();
+ }
+ }
+}