You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/26 14:14:32 UTC

incubator-tinkerpop git commit: During my manual testing, I found another memory hole in SparkGraphComputer in the MapReduce-zone. If the data isn't partitioned nicely, then an OME is easily achieved. This push turns map(), combine(), and reduce() into l

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 9271813a0 -> df8d4b151


During my manual testing, I found another memory hole in SparkGraphComputer in the MapReduce-zone. If the data isn't partitioned nicely, then an OME is easily achieved. This push turns map(), combine(), and reduce() into lazy iterator operations. Moreover, I added combine() which limits the GC pressure on the groupByKey() in reduce(). The runtimes are relatively the same for the testing graph I'm using in my local 3-worker SparkServer environment, but fortunately, the GC is not acting up. I've run the full integration test suite for SparkGraphComputer. All is good in the hood. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/df8d4b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/df8d4b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/df8d4b15

Branch: refs/heads/master
Commit: df8d4b151862b9448e6280f9d5d7a52576548591
Parents: 9271813
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 26 06:14:15 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 26 06:14:28 2016 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   2 +
 .../gremlin/hadoop/structure/HadoopGraph.java   |   5 +
 .../spark/process/computer/CombineIterator.java | 117 +++++++++++++++++++
 .../spark/process/computer/MapIterator.java     |  86 ++++++++++++++
 .../spark/process/computer/ReduceIterator.java  |  85 ++++++++++++++
 .../spark/process/computer/SparkExecutor.java   |  31 ++---
 .../process/computer/SparkGraphComputer.java    |   7 +-
 .../spark/process/computer/SparkMapEmitter.java |  45 -------
 .../process/computer/SparkReduceEmitter.java    |  45 -------
 9 files changed, 309 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index ee241fd..a61153e 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,8 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
 TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Added a lazy iterator, memory safe implementation of MapReduce to `SparkGraphComputer`.
+* Added `MapReduce.combine()` support to `SparkGraphComputer`.
 * Bumped to Neo4j 2.3.2.
 * Fixed Java comparator contract issue around `Order.shuffle`.
 * Optimized a very inefficient implementation of `SampleLocalStep`.

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index d89e9fb..66f909c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -171,6 +171,11 @@ import java.util.stream.Stream;
         test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTestV3d0",
         method = "g_V_repeatXbothXfollowedByXX_timesX2X_groupXaX_byXsongTypeX_byXcountX_capXaX",
         reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins are required. This particular tests takes many minutes to execute.")
+@Graph.OptOut(
+        test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
+        method = "shouldStartAndEndWorkersForVertexProgramAndMapReduce",
+        reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
+        computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
 public final class HadoopGraph implements Graph {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(HadoopGraph.class);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
new file mode 100644
index 0000000..a6e3a11
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK, OV>> {
+
+    private final Iterator<Tuple2<K, V>> inputIterator;
+    private final MapReduce<K, V, OK, OV, ?> mapReduce;
+    private final CombineIteratorEmitter combineIteratorEmitter = new CombineIteratorEmitter();
+    private final Map<K, List<V>> combineMap = new ConcurrentHashMap<>();
+
+    public CombineIterator(final MapReduce<K, V, OK, OV, ?> mapReduce, final Iterator<Tuple2<K, V>> inputIterator) {
+        this.inputIterator = inputIterator;
+        this.mapReduce = mapReduce;
+        this.mapReduce.workerStart(MapReduce.Stage.COMBINE);
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if (!this.combineMap.isEmpty())
+            return true;
+        else if (!this.inputIterator.hasNext()) {
+            this.mapReduce.workerEnd(MapReduce.Stage.COMBINE);
+            return false;
+        } else {
+            this.processNext();
+            return this.hasNext();
+        }
+    }
+
+    @Override
+    public Tuple2<OK, OV> next() {
+        if (!this.combineMap.isEmpty())
+            return this.nextFromCombineMap();
+        else if (!this.inputIterator.hasNext()) {
+            this.mapReduce.workerEnd(MapReduce.Stage.COMBINE);
+            throw FastNoSuchElementException.instance();
+        } else {
+            this.processNext();
+            return this.next();
+        }
+    }
+
+    private void processNext() {
+        while (this.inputIterator.hasNext() && this.combineMap.size() < 500) {
+            final Tuple2<K, V> keyValue = this.inputIterator.next();
+            List<V> values = this.combineMap.get(keyValue._1());
+            if (null == values) {
+                values = new ArrayList<>();
+                this.combineMap.put(keyValue._1(), values);
+            }
+            values.add(keyValue._2());
+            if (values.size() > 1000)
+                break;
+        }
+        for (final K key : this.combineMap.keySet()) {
+            final List<V> values = this.combineMap.get(key);
+            if (values.size() > 1) {
+                this.combineMap.remove(key);
+                this.mapReduce.combine(key, values.iterator(), this.combineIteratorEmitter);
+            }
+        }
+    }
+
+    private Tuple2<OK, OV> nextFromCombineMap() {
+        final OK key = (OK) this.combineMap.keySet().iterator().next();
+        final List<OV> values = (List<OV>) this.combineMap.get(key);
+        final Tuple2<OK, OV> keyValue = new Tuple2<>(key, values.remove(0));
+        if (values.isEmpty())
+            this.combineMap.remove(key);
+        return keyValue;
+    }
+
+    private class CombineIteratorEmitter implements MapReduce.ReduceEmitter<OK, OV> {
+
+        @Override
+        public void emit(final OK key, OV value) {
+            List<V> values = combineMap.get(key);
+            if (null == values) {
+                values = new ArrayList<>();
+                combineMap.put((K) key, values);
+            }
+            values.add((V) value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java
new file mode 100644
index 0000000..cdd62cb
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MapIterator<K, V> implements Iterator<Tuple2<K, V>> {
+
+    private final Iterator<Tuple2<Object, VertexWritable>> inputIterator;
+    private final MapReduce<K, V, ?, ?, ?> mapReduce;
+    private final Queue<Tuple2<K, V>> queue = new LinkedList<>();
+    private final MapIteratorEmitter mapIteratorEmitter = new MapIteratorEmitter();
+
+    public MapIterator(final MapReduce<K, V, ?, ?, ?> mapReduce, final Iterator<Tuple2<Object, VertexWritable>> inputIterator) {
+        this.inputIterator = inputIterator;
+        this.mapReduce = mapReduce;
+        this.mapReduce.workerStart(MapReduce.Stage.MAP);
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if (!this.queue.isEmpty())
+            return true;
+        else if (!this.inputIterator.hasNext()) {
+            this.mapReduce.workerEnd(MapReduce.Stage.MAP);
+            return false;
+        } else {
+            this.processNext();
+            return this.hasNext();
+        }
+    }
+
+    @Override
+    public Tuple2<K, V> next() {
+        if (!this.queue.isEmpty())
+            return this.queue.remove();
+        else if (!this.inputIterator.hasNext()) {
+            this.mapReduce.workerEnd(MapReduce.Stage.MAP);
+            throw FastNoSuchElementException.instance();
+        } else {
+            this.processNext();
+            return this.next();
+        }
+    }
+
+    private void processNext() {
+        this.mapReduce.map(ComputerGraph.mapReduce(this.inputIterator.next()._2().get()), this.mapIteratorEmitter);
+    }
+
+    private class MapIteratorEmitter implements MapReduce.MapEmitter<K, V> {
+
+        @Override
+        public void emit(final K key, V value) {
+            queue.add(new Tuple2<>(key, value));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java
new file mode 100644
index 0000000..e3127f7
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ReduceIterator<K, V, OK, OV> implements Iterator<Tuple2<OK, OV>> {
+
+    private final Iterator<Tuple2<K, Iterable<V>>> inputIterator;
+    private final MapReduce<K, V, OK, OV, ?> mapReduce;
+    private final Queue<Tuple2<OK, OV>> queue = new LinkedList<>();
+    private final ReduceIteratorEmitter reduceIteratorEmitter = new ReduceIteratorEmitter();
+
+    public ReduceIterator(final MapReduce<K, V, OK, OV, ?> mapReduce, final Iterator<Tuple2<K, Iterable<V>>> inputIterator) {
+        this.inputIterator = inputIterator;
+        this.mapReduce = mapReduce;
+        this.mapReduce.workerStart(MapReduce.Stage.REDUCE);
+    }
+
+
+    @Override
+    public boolean hasNext() {
+        if (!this.queue.isEmpty())
+            return true;
+        else if (!this.inputIterator.hasNext()) {
+            this.mapReduce.workerEnd(MapReduce.Stage.REDUCE);
+            return false;
+        } else {
+            this.processNext();
+            return this.hasNext();
+        }
+    }
+
+    @Override
+    public Tuple2<OK, OV> next() {
+        if (!this.queue.isEmpty())
+            return this.queue.remove();
+        else if (!this.inputIterator.hasNext()) {
+            this.mapReduce.workerEnd(MapReduce.Stage.REDUCE);
+            throw FastNoSuchElementException.instance();
+        } else {
+            this.processNext();
+            return this.next();
+        }
+    }
+
+    private void processNext() {
+        final Tuple2<K, Iterable<V>> nextKeyValues = this.inputIterator.next();
+        this.mapReduce.reduce(nextKeyValues._1(), nextKeyValues._2().iterator(), this.reduceIteratorEmitter);
+    }
+
+    private class ReduceIteratorEmitter implements MapReduce.ReduceEmitter<OK, OV> {
+
+        @Override
+        public void emit(final OK key, OV value) {
+            queue.add(new Tuple2<>(key, value));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index d3f31cb..5889bdb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -156,35 +156,24 @@ public final class SparkExecutor {
     public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
-            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.MAP);
-            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
-                workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
-                if (!partitionIterator.hasNext())
-                    workerMapReduce.workerEnd(MapReduce.Stage.MAP);
-                return mapEmitter.getEmissions();
-            });
+            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
         if (mapReduce.getMapKeySort().isPresent())
             mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
         return mapRDD;
     }
 
-    // TODO: public static executeCombine()  is this necessary?  YES --- we groupByKey in reduce() where we want to combine first.
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
+        return mapRDD.mapPartitionsToPair(partitionIterator -> {
+            HadoopPools.initialize(apacheConfiguration);
+            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+        });
+    }
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
-        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             HadoopPools.initialize(apacheConfiguration);
-            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
-            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
-            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
-            return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
-                if (!partitionIterator.hasNext())
-                    workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-                return reduceEmitter.getEmissions();
-            });
+            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 30f4dba..58a0af6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -237,15 +237,16 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         mapReduce.storeState(newApacheConfiguration);
                         // map
                         final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
-                        // combine TODO: is this really needed?
+                        // combine
+                        final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
                         // reduce
-                        final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+                        final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
                         // write the map reduce output back to disk (memory)
                         try {
                             mapReduce.addResultToMemory(finalMemory,
                                     hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
                                             .newInstance()
-                                            .writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), null == reduceRDD ? mapRDD : reduceRDD));
+                                            .writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD));
                         } catch (final InstantiationException | IllegalAccessException e) {
                             throw new IllegalStateException(e.getMessage(), e);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
deleted file mode 100644
index cf31249..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
-
-    private List<Tuple2<K, V>> emissions = new ArrayList<>();
-
-    @Override
-    public void emit(final K key, final V value) {
-        this.emissions.add(new Tuple2<>(key, value));
-    }
-
-    public Iterator<Tuple2<K, V>> getEmissions() {
-        final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
-        this.emissions = new ArrayList<>();
-        return iterator;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
deleted file mode 100644
index e2252fa..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
-
-    private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
-
-    @Override
-    public void emit(final OK key, final OV value) {
-        this.emissions.add(new Tuple2<>(key, value));
-    }
-
-    public Iterator<Tuple2<OK, OV>> getEmissions() {
-        final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
-        this.emissions = new ArrayList<>();
-        return iterator;
-    }
-}