You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/05/28 15:24:45 UTC
incubator-tinkerpop git commit: a bug in MapReduce result
serialization in Spark. Hmmm. Went back to the old model requiring HDFS.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 27cb6ba31 -> f1b142748
a bug in MapReduce result serialization in Spark. Hmmm. Went back to the old model requiring HDFS.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f1b14274
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f1b14274
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f1b14274
Branch: refs/heads/master
Commit: f1b1427480e80eec2bc934de1f410d47c2c16edf
Parents: 27cb6ba
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 28 07:24:49 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 28 07:24:57 2015 -0600
----------------------------------------------------------------------
.../hadoop/process/computer/spark/SparkExecutor.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f1b14274/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 69c373b..b7268b8 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -204,14 +204,19 @@ public final class SparkExecutor {
//////////////////
public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION,null);
if (null != outputLocation) {
// map back to a Hadoop stream for output
mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
ObjectWritable.class,
ObjectWritable.class,
SequenceFileOutputFormat.class, hadoopConfiguration);
- mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+ // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+ try {
+ mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
}
}
}