You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/26 14:14:32 UTC
incubator-tinkerpop git commit: During my manual testing,
I found another memory hole in SparkGraphComputer in the
MapReduce-zone. If the data isn't partitioned nicely,
then an OME is easily achieved. This push turns map(), combine(),
and reduce() into l
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 9271813a0 -> df8d4b151
During my manual testing, I found another memory hole in SparkGraphComputer in the MapReduce-zone. If the data isn't partitioned nicely, then an OME is easily achieved. This push turns map(), combine(), and reduce() into lazy iterator operations. Moreover, I added combine() which limits the GC pressure on the groupByKey() in reduce(). The runtimes are relatively the same for the testing graph I'm using in my local 3-worker SparkServer environment, but fortunately, the GC is not acting up. I've run the full integration test suite for SparkGraphComputer. All is good in the hood. CTR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/df8d4b15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/df8d4b15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/df8d4b15
Branch: refs/heads/master
Commit: df8d4b151862b9448e6280f9d5d7a52576548591
Parents: 9271813
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 26 06:14:15 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 26 06:14:28 2016 -0700
----------------------------------------------------------------------
CHANGELOG.asciidoc | 2 +
.../gremlin/hadoop/structure/HadoopGraph.java | 5 +
.../spark/process/computer/CombineIterator.java | 117 +++++++++++++++++++
.../spark/process/computer/MapIterator.java | 86 ++++++++++++++
.../spark/process/computer/ReduceIterator.java | 85 ++++++++++++++
.../spark/process/computer/SparkExecutor.java | 31 ++---
.../process/computer/SparkGraphComputer.java | 7 +-
.../spark/process/computer/SparkMapEmitter.java | 45 -------
.../process/computer/SparkReduceEmitter.java | 45 -------
9 files changed, 309 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index ee241fd..a61153e 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,8 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* Added a lazy iterator, memory safe implementation of MapReduce to `SparkGraphComputer`.
+* Added `MapReduce.combine()` support to `SparkGraphComputer`.
* Bumped to Neo4j 2.3.2.
* Fixed Java comparator contract issue around `Order.shuffle`.
* Optimized a very inefficient implementation of `SampleLocalStep`.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index d89e9fb..66f909c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -171,6 +171,11 @@ import java.util.stream.Stream;
test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTestV3d0",
method = "g_V_repeatXbothXfollowedByXX_timesX2X_groupXaX_byXsongTypeX_byXcountX_capXaX",
reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins are required. This particular tests takes many minutes to execute.")
+@Graph.OptOut(
+ test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
+ method = "shouldStartAndEndWorkersForVertexProgramAndMapReduce",
+ reason = "Spark executes map and combine in a lazy fashion and thus, fails the blocking aspect of this test",
+ computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"})
public final class HadoopGraph implements Graph {
public static final Logger LOGGER = LoggerFactory.getLogger(HadoopGraph.class);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
new file mode 100644
index 0000000..a6e3a11
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/CombineIterator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class CombineIterator<K, V, OK, OV> implements Iterator<Tuple2<OK, OV>> {
+
+ private final Iterator<Tuple2<K, V>> inputIterator;
+ private final MapReduce<K, V, OK, OV, ?> mapReduce;
+ private final CombineIteratorEmitter combineIteratorEmitter = new CombineIteratorEmitter();
+ private final Map<K, List<V>> combineMap = new ConcurrentHashMap<>();
+
+ public CombineIterator(final MapReduce<K, V, OK, OV, ?> mapReduce, final Iterator<Tuple2<K, V>> inputIterator) {
+ this.inputIterator = inputIterator;
+ this.mapReduce = mapReduce;
+ this.mapReduce.workerStart(MapReduce.Stage.COMBINE);
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if (!this.combineMap.isEmpty())
+ return true;
+ else if (!this.inputIterator.hasNext()) {
+ this.mapReduce.workerEnd(MapReduce.Stage.COMBINE);
+ return false;
+ } else {
+ this.processNext();
+ return this.hasNext();
+ }
+ }
+
+ @Override
+ public Tuple2<OK, OV> next() {
+ if (!this.combineMap.isEmpty())
+ return this.nextFromCombineMap();
+ else if (!this.inputIterator.hasNext()) {
+ this.mapReduce.workerEnd(MapReduce.Stage.COMBINE);
+ throw FastNoSuchElementException.instance();
+ } else {
+ this.processNext();
+ return this.next();
+ }
+ }
+
+ private void processNext() {
+ while (this.inputIterator.hasNext() && this.combineMap.size() < 500) {
+ final Tuple2<K, V> keyValue = this.inputIterator.next();
+ List<V> values = this.combineMap.get(keyValue._1());
+ if (null == values) {
+ values = new ArrayList<>();
+ this.combineMap.put(keyValue._1(), values);
+ }
+ values.add(keyValue._2());
+ if (values.size() > 1000)
+ break;
+ }
+ for (final K key : this.combineMap.keySet()) {
+ final List<V> values = this.combineMap.get(key);
+ if (values.size() > 1) {
+ this.combineMap.remove(key);
+ this.mapReduce.combine(key, values.iterator(), this.combineIteratorEmitter);
+ }
+ }
+ }
+
+ private Tuple2<OK, OV> nextFromCombineMap() {
+ final OK key = (OK) this.combineMap.keySet().iterator().next();
+ final List<OV> values = (List<OV>) this.combineMap.get(key);
+ final Tuple2<OK, OV> keyValue = new Tuple2<>(key, values.remove(0));
+ if (values.isEmpty())
+ this.combineMap.remove(key);
+ return keyValue;
+ }
+
+ private class CombineIteratorEmitter implements MapReduce.ReduceEmitter<OK, OV> {
+
+ @Override
+ public void emit(final OK key, OV value) {
+ List<V> values = combineMap.get(key);
+ if (null == values) {
+ values = new ArrayList<>();
+ combineMap.put((K) key, values);
+ }
+ values.add((V) value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java
new file mode 100644
index 0000000..cdd62cb
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MapIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MapIterator<K, V> implements Iterator<Tuple2<K, V>> {
+
+ private final Iterator<Tuple2<Object, VertexWritable>> inputIterator;
+ private final MapReduce<K, V, ?, ?, ?> mapReduce;
+ private final Queue<Tuple2<K, V>> queue = new LinkedList<>();
+ private final MapIteratorEmitter mapIteratorEmitter = new MapIteratorEmitter();
+
+ public MapIterator(final MapReduce<K, V, ?, ?, ?> mapReduce, final Iterator<Tuple2<Object, VertexWritable>> inputIterator) {
+ this.inputIterator = inputIterator;
+ this.mapReduce = mapReduce;
+ this.mapReduce.workerStart(MapReduce.Stage.MAP);
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if (!this.queue.isEmpty())
+ return true;
+ else if (!this.inputIterator.hasNext()) {
+ this.mapReduce.workerEnd(MapReduce.Stage.MAP);
+ return false;
+ } else {
+ this.processNext();
+ return this.hasNext();
+ }
+ }
+
+ @Override
+ public Tuple2<K, V> next() {
+ if (!this.queue.isEmpty())
+ return this.queue.remove();
+ else if (!this.inputIterator.hasNext()) {
+ this.mapReduce.workerEnd(MapReduce.Stage.MAP);
+ throw FastNoSuchElementException.instance();
+ } else {
+ this.processNext();
+ return this.next();
+ }
+ }
+
+ private void processNext() {
+ this.mapReduce.map(ComputerGraph.mapReduce(this.inputIterator.next()._2().get()), this.mapIteratorEmitter);
+ }
+
+ private class MapIteratorEmitter implements MapReduce.MapEmitter<K, V> {
+
+ @Override
+ public void emit(final K key, V value) {
+ queue.add(new Tuple2<>(key, value));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java
new file mode 100644
index 0000000..e3127f7
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/ReduceIterator.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import scala.Tuple2;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ReduceIterator<K, V, OK, OV> implements Iterator<Tuple2<OK, OV>> {
+
+ private final Iterator<Tuple2<K, Iterable<V>>> inputIterator;
+ private final MapReduce<K, V, OK, OV, ?> mapReduce;
+ private final Queue<Tuple2<OK, OV>> queue = new LinkedList<>();
+ private final ReduceIteratorEmitter reduceIteratorEmitter = new ReduceIteratorEmitter();
+
+ public ReduceIterator(final MapReduce<K, V, OK, OV, ?> mapReduce, final Iterator<Tuple2<K, Iterable<V>>> inputIterator) {
+ this.inputIterator = inputIterator;
+ this.mapReduce = mapReduce;
+ this.mapReduce.workerStart(MapReduce.Stage.REDUCE);
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if (!this.queue.isEmpty())
+ return true;
+ else if (!this.inputIterator.hasNext()) {
+ this.mapReduce.workerEnd(MapReduce.Stage.REDUCE);
+ return false;
+ } else {
+ this.processNext();
+ return this.hasNext();
+ }
+ }
+
+ @Override
+ public Tuple2<OK, OV> next() {
+ if (!this.queue.isEmpty())
+ return this.queue.remove();
+ else if (!this.inputIterator.hasNext()) {
+ this.mapReduce.workerEnd(MapReduce.Stage.REDUCE);
+ throw FastNoSuchElementException.instance();
+ } else {
+ this.processNext();
+ return this.next();
+ }
+ }
+
+ private void processNext() {
+ final Tuple2<K, Iterable<V>> nextKeyValues = this.inputIterator.next();
+ this.mapReduce.reduce(nextKeyValues._1(), nextKeyValues._2().iterator(), this.reduceIteratorEmitter);
+ }
+
+ private class ReduceIteratorEmitter implements MapReduce.ReduceEmitter<OK, OV> {
+
+ @Override
+ public void emit(final OK key, OV value) {
+ queue.add(new Tuple2<>(key, value));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index d3f31cb..5889bdb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -156,35 +156,24 @@ public final class SparkExecutor {
public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
- final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
- workerMapReduce.workerStart(MapReduce.Stage.MAP);
- final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
- workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
- if (!partitionIterator.hasNext())
- workerMapReduce.workerEnd(MapReduce.Stage.MAP);
- return mapEmitter.getEmissions();
- });
+ return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
if (mapReduce.getMapKeySort().isPresent())
mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
return mapRDD;
}
- // TODO: public static executeCombine() is this necessary? YES --- we groupByKey in reduce() where we want to combine first.
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
+ return mapRDD.mapPartitionsToPair(partitionIterator -> {
+ HadoopPools.initialize(apacheConfiguration);
+ return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+ });
+ }
- public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
- final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
- workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
- final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
- return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
- workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
- if (!partitionIterator.hasNext())
- workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
- return reduceEmitter.getEmissions();
- });
+ return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
if (mapReduce.getReduceKeySort().isPresent())
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 30f4dba..58a0af6 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -237,15 +237,16 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
mapReduce.storeState(newApacheConfiguration);
// map
final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
- // combine TODO: is this really needed?
+ // combine
+ final JavaPairRDD combineRDD = mapReduce.doStage(MapReduce.Stage.COMBINE) ? SparkExecutor.executeCombine(mapRDD, newApacheConfiguration) : mapRDD;
// reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+ final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
// write the map reduce output back to disk (memory)
try {
mapReduce.addResultToMemory(finalMemory,
hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
.newInstance()
- .writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), null == reduceRDD ? mapRDD : reduceRDD));
+ .writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD));
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
deleted file mode 100644
index cf31249..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
-
- private List<Tuple2<K, V>> emissions = new ArrayList<>();
-
- @Override
- public void emit(final K key, final V value) {
- this.emissions.add(new Tuple2<>(key, value));
- }
-
- public Iterator<Tuple2<K, V>> getEmissions() {
- final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
- this.emissions = new ArrayList<>();
- return iterator;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/df8d4b15/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
deleted file mode 100644
index e2252fa..0000000
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
-
- private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
-
- @Override
- public void emit(final OK key, final OV value) {
- this.emissions.add(new Tuple2<>(key, value));
- }
-
- public Iterator<Tuple2<OK, OV>> getEmissions() {
- final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
- this.emissions = new ArrayList<>();
- return iterator;
- }
-}