You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/28 21:44:27 UTC

incubator-tinkerpop git commit: large scale benchmarking of SparkGraphComputer with the Friendster dataset identified a combiner() issue. I wasnt combining into sufficiently small chunks per partition. This was extremely wasteful and on our large scale b

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 93b112d40 -> 318eb34be


large scale benchmarking of SparkGraphComputer with the Friendster dataset identified a combiner() issue. I wasnt combining into sufficiently small chunks per partition. This was extremely wasteful and on our large scale benchmarking, led to GC stalls in MapReduce phases. I wrote a really solid test suite for CombinerIterator that demonstrates proper aggregation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/318eb34b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/318eb34b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/318eb34b

Branch: refs/heads/master
Commit: 318eb34be68a441fa2df0ec17b56eb3acef3be7d
Parents: 93b112d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 28 13:44:20 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 28 13:44:20 2016 -0700

----------------------------------------------------------------------
 .../gremlin/process/computer/MapReduce.java     |  13 +-
 .../spark/process/computer/CombineIterator.java |  32 +--
 .../process/computer/CombineIteratorTest.java   | 193 +++++++++++++++++++
 3 files changed, 219 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/318eb34b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
index a75c4c8..3e1eae6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
@@ -271,7 +271,7 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
     /**
      * A convenience singleton when a single key is needed so that all emitted values converge to the same combiner/reducer.
      */
-    public static class NullObject implements Comparable, Serializable {
+    public static class NullObject implements Comparable<NullObject>, Serializable {
         private static final NullObject INSTANCE = new NullObject();
         private static final String NULL_OBJECT = "";
 
@@ -281,20 +281,17 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
 
         @Override
         public int hashCode() {
-            return 0;
+            return -9832049;
         }
 
         @Override
         public boolean equals(final Object object) {
-            return object instanceof NullObject;
+            return this == object || object instanceof NullObject;
         }
 
         @Override
-        public int compareTo(final Object object) {
-            if (object instanceof NullObject)
-                return 0;
-            else
-                throw new IllegalArgumentException("The " + NullObject.class.getSimpleName() + " can not be compared with " + object.getClass().getSimpleName());
+        public int compareTo(final NullObject object) {
+            return 0;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/318eb34b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
index 9ca5bce..b6c544f 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
@@ -38,6 +38,7 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
     private final MapReduce<K, V, OK, OV, ?> mapReduce;
     private final CombineIteratorEmitter combineIteratorEmitter = new CombineIteratorEmitter();
     private final Map<K, List<V>> combineMap = new ConcurrentHashMap<>();
+    private boolean combined = true;
 
     public CombineIterator(final MapReduce<K, V, OK, OV, ?> mapReduce, final Iterator<Tuple2<K, V>> inputIterator) {
         this.inputIterator = inputIterator;
@@ -74,8 +75,8 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
     private static final int MAX_SIZE = 5000;
 
     private void processNext() {
-        int sizeCounter = this.combineMap.size();
-        while (sizeCounter < MAX_SIZE && this.inputIterator.hasNext()) {
+        int combinedSize = this.combineMap.size();
+        while (combinedSize < MAX_SIZE && this.inputIterator.hasNext()) {
             final Tuple2<K, V> keyValue = this.inputIterator.next();
             List<V> values = this.combineMap.get(keyValue._1());
             if (null == values) {
@@ -83,20 +84,30 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
                 this.combineMap.put(keyValue._1(), values);
             }
             values.add(keyValue._2());
-            if (++sizeCounter > MAX_SIZE) {
-                for (final K key : this.combineMap.keySet()) {
-                    final List<V> values2 = this.combineMap.get(key);
-                    if (values2.size() > 1) {
-                        this.combineMap.remove(key);
-                        this.mapReduce.combine(key, values2.iterator(), this.combineIteratorEmitter);
-                    }
+            combinedSize++;
+            this.combined = false;
+            if (combinedSize >= MAX_SIZE) {
+                this.doCombine();
+                combinedSize = this.combineMap.size();
+            }
+        }
+    }
+
+    private void doCombine() {
+        if (!this.combined) {
+            for (final K key : this.combineMap.keySet()) {
+                final List<V> values2 = this.combineMap.get(key);
+                if (values2.size() > 1) {
+                    this.combineMap.remove(key);
+                    this.mapReduce.combine(key, values2.iterator(), this.combineIteratorEmitter);
                 }
-                sizeCounter = this.combineMap.size();
             }
+            this.combined = true;
         }
     }
 
     private Tuple2<OK, OV> nextFromCombineMap() {
+        this.doCombine();
         final OK key = (OK) this.combineMap.keySet().iterator().next();
         final List<OV> values = (List<OV>) this.combineMap.get(key);
         final Tuple2<OK, OV> keyValue = new Tuple2<>(key, values.remove(0));
@@ -106,7 +117,6 @@ public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK,
     }
 
     private class CombineIteratorEmitter implements MapReduce.ReduceEmitter<OK, OV> {
-
         @Override
         public void emit(final OK key, OV value) {
             List<V> values = combineMap.get(key);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/318eb34b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java
new file mode 100644
index 0000000..e5458b0
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIteratorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class CombineIteratorTest {
+
+    @Test
+    public void shouldBulkResults() {
+        long total = 0;
+        final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+        for (long i = 0; i < 100; i++) {
+            total = total + i;
+            numbers.add(new Tuple2<>("test", i));
+        }
+
+        final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+        final Tuple2<String, Long> tuple = combineIterator.next();
+        assertEquals("test", tuple._1());
+        assertEquals(Long.valueOf(total), tuple._2());
+        assertFalse(combineIterator.hasNext());
+    }
+
+    @Test
+    public void shouldDoubleBulkResults() {
+        long total = 0;
+        final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+        for (long i = 0; i < 9000; i++) {
+            total = total + i;
+            numbers.add(new Tuple2<>("test", i));
+        }
+
+        final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+        assertTrue(combineIterator.hasNext());
+        final Tuple2<String, Long> tuple = combineIterator.next();
+        assertFalse(combineIterator.hasNext());
+        assertEquals("test", tuple._1());
+        assertEquals(total, tuple._2().longValue());
+    }
+
+    @Test
+    public void shouldTripleBulkResults() {
+        long total = 0;
+        final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+        for (long i = 0; i < 14000; i++) {
+            total = total + i;
+            numbers.add(new Tuple2<>("test", i));
+        }
+
+        final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+        assertTrue(combineIterator.hasNext());
+        final Tuple2<String, Long> tuple = combineIterator.next();
+        assertFalse(combineIterator.hasNext());
+        assertEquals("test", tuple._1());
+        assertEquals(total, tuple._2().longValue());
+    }
+
+    @Test
+    public void shouldEndlessBulkResults() {
+        long total = 0;
+        final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+        for (long i = 0; i < 5000000; i++) {
+            total = total + i;
+            numbers.add(new Tuple2<>("test", i));
+        }
+
+        final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+        assertTrue(combineIterator.hasNext());
+        final Tuple2<String, Long> tuple = combineIterator.next();
+        assertFalse(combineIterator.hasNext());
+        assertEquals("test", tuple._1());
+        assertEquals(total, tuple._2().longValue());
+    }
+
+    @Test
+    public void shouldEndlessBulkResultsWithNullObject() {
+        long total = 0;
+        final List<Tuple2<MapReduce.NullObject, Long>> numbers = new ArrayList<>();
+        for (long i = 0; i < 5000000; i++) {
+            total = total + i;
+            numbers.add(new Tuple2<>(MapReduce.NullObject.instance(), i));
+        }
+
+        final CombineIterator<MapReduce.NullObject, Long, MapReduce.NullObject, Long> combineIterator = new CombineIterator<>(new MapReduceB(), numbers.iterator());
+        assertTrue(combineIterator.hasNext());
+        final Tuple2<MapReduce.NullObject, Long> tuple = combineIterator.next();
+        assertFalse(combineIterator.hasNext());
+        assertEquals(MapReduce.NullObject.instance(), tuple._1());
+        assertEquals(total, tuple._2().longValue());
+    }
+
+    @Test
+    public void shouldBulkResultsByKey() {
+        long total = 0;
+        final List<Tuple2<String, Long>> numbers = new ArrayList<>();
+        for (long i = 0; i < 9000; i++) {
+            total = total + i;
+            numbers.add(new Tuple2<>(UUID.randomUUID().toString(), i));
+        }
+
+        final CombineIterator<String, Long, String, Long> combineIterator = new CombineIterator<>(new MapReduceA(), numbers.iterator());
+        assertEquals(9000, IteratorUtils.count(combineIterator));
+    }
+
+    private static class MapReduceA extends StaticMapReduce<String, Long, String, Long, Long> {
+
+        @Override
+        public void combine(final String key, final Iterator<Long> values, final ReduceEmitter<String, Long> emitter) {
+            long counter = 0;
+            while (values.hasNext()) {
+                counter = counter + values.next();
+            }
+            emitter.emit(key, counter);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return true;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return "test";
+        }
+
+        @Override
+        public Long generateFinalResult(final Iterator<KeyValue<String, Long>> keyValues) {
+            return keyValues.next().getValue();
+        }
+    }
+
+    private static class MapReduceB extends StaticMapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> {
+
+        @Override
+        public void combine(final MapReduce.NullObject key, final Iterator<Long> values, final ReduceEmitter<MapReduce.NullObject, Long> emitter) {
+            long counter = 0;
+            while (values.hasNext()) {
+                counter = counter + values.next();
+            }
+            emitter.emit(key, counter);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return true;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return "test";
+        }
+
+        @Override
+        public Long generateFinalResult(final Iterator<KeyValue<MapReduce.NullObject, Long>> keyValues) {
+            return keyValues.next().getValue();
+        }
+    }
+}