You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/05/28 15:24:45 UTC

incubator-tinkerpop git commit: a bug in MapReduce result serialization in Spark. Hmmm. Went back to the old model requiring HDFS.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 27cb6ba31 -> f1b142748


a bug in MapReduce result serialization in Spark. Hmmm. Went back to the old model requiring HDFS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/f1b14274
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/f1b14274
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/f1b14274

Branch: refs/heads/master
Commit: f1b1427480e80eec2bc934de1f410d47c2c16edf
Parents: 27cb6ba
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 28 07:24:49 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 28 07:24:57 2015 -0600

----------------------------------------------------------------------
 .../hadoop/process/computer/spark/SparkExecutor.java        | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/f1b14274/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 69c373b..b7268b8 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -204,14 +204,19 @@ public final class SparkExecutor {
     //////////////////
 
     public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
-        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION,null);
         if (null != outputLocation) {
             // map back to a Hadoop stream for output
             mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
                     ObjectWritable.class,
                     ObjectWritable.class,
                     SequenceFileOutputFormat.class, hadoopConfiguration);
-           mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+           // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+            try {
+            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
         }
     }
 }