You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:09 UTC

[14/20] incubator-tinkerpop git commit: lots of clean up and organization. SparkGraphComputer is now really clean with all the dirty work being done by SparkHelper.

lots of clean up and organization. SparkGraphComputer is now really clean with all the dirty work being done by SparkHelper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b6133ae7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b6133ae7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b6133ae7

Branch: refs/heads/master
Commit: b6133ae75e4f8ebb29f0da042c37e9ce09d92ca4
Parents: 8246ee6
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 15:32:05 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 15:32:05 2015 -0700

----------------------------------------------------------------------
 .../computer/giraph/GiraphGraphComputer.java    |  10 +-
 .../computer/spark/GraphComputerRDD.java        | 106 -----------
 .../process/computer/spark/RuleAccumulator.java |   2 +-
 .../computer/spark/SparkGraphComputer.java      | 158 ++++++-----------
 .../process/computer/spark/SparkMapEmitter.java |   2 +-
 .../process/computer/spark/SparkMemory.java     |   1 -
 .../computer/spark/SparkMemoryAccumulator.java  |   2 +-
 .../process/computer/spark/SparkMessenger.java  |  31 +++-
 .../computer/spark/SparkReduceEmitter.java      |   2 +-
 .../process/computer/spark/ToyVertex.java       | 114 ------------
 .../computer/spark/util/SparkHelper.java        | 177 +++++++++++++++++++
 .../hadoop/structure/HadoopConfiguration.java   |   5 +-
 .../gremlin/hadoop/structure/HadoopGraph.java   |   2 +-
 13 files changed, 267 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 589c22c..56d029c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -221,14 +221,8 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
     }
 
     public static void main(final String[] args) throws Exception {
-        try {
-            final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
-            final GiraphGraphComputer computer = new GiraphGraphComputer(HadoopGraph.open(configuration));
-            computer.program(VertexProgram.createVertexProgram(configuration)).submit().get();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new GiraphGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(configuration)).submit().get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
deleted file mode 100644
index 786e5af..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.function.FlatMapFunction2;
-import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-import scala.reflect.ManifestFactory;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>> {
-
-    public GraphComputerRDD(final RDD<Tuple2<Object, SparkMessenger<M>>> rdd) {
-        super(rdd, ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
-    }
-
-    public GraphComputerRDD(final JavaPairRDD<Object, SparkMessenger<M>> rdd) {
-        super(rdd.rdd(), ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
-    }
-
-    public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
-        JavaPairRDD<Object, SparkMessenger<M>> current = this;
-        // execute vertex program
-        current = current.mapPartitionsToPair(iterator -> {
-            final VertexProgram<M> vertexProgram = VertexProgram.createVertexProgram(configuration);
-            return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
-                vertexProgram.execute(tuple._2().vertex, tuple._2(), memory);
-                return tuple;
-            });
-        });
-        // clear all previous incoming messages
-        if (!memory.isInitialIteration()) {
-            current = current.mapValues(messenger -> {
-                messenger.clearIncomingMessages();
-                return messenger;
-            });
-        }
-        // emit messages
-        current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
-            final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().outgoing.entrySet()
-                    .stream()
-                    .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new ToyVertex(entry.getKey()), entry.getValue())))
-                    .collect(Collectors.toList());          // the message vertices
-            list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
-            return list;
-        });
-        // "message pass" via reduction
-        current = current.reduceByKey((a, b) -> {
-            if (a.vertex instanceof ToyVertex && !(b.vertex instanceof ToyVertex))
-                a.vertex = b.vertex;
-            a.incoming.addAll(b.incoming);
-            return a;
-        });
-        // clear all previous outgoing messages
-        current = current.mapValues(messenger -> {
-            messenger.clearOutgoingMessages();
-            return messenger;
-        });
-        return GraphComputerRDD.of(current);
-    }
-
-    public static <M> GraphComputerRDD<M> of(final JavaPairRDD<Object, SparkMessenger<M>> javaPairRDD) {
-        return new GraphComputerRDD<>(javaPairRDD);
-    }
-
-    public static <M> GraphComputerRDD<M> of(final JavaRDD<Tuple2<Object, SparkMessenger<M>>> javaRDD) {
-        return new GraphComputerRDD<>(javaRDD.rdd());
-    }
-
-    //////////////
-
-    // TODO: What the hell is this for?
-    @Override
-    public JavaRDD zipPartitions(JavaRDDLike uJavaRDDLike, FlatMapFunction2 iteratorIteratorVFlatMapFunction2) {
-        return (JavaRDD) new JavaRDD<>(null, null);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
index 59da2f4..446dbdb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
@@ -24,7 +24,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RuleAccumulator implements AccumulatorParam<Rule> {
+public final class RuleAccumulator implements AccumulatorParam<Rule> {
 
     @Override
     public Rule addAccumulator(final Rule a, final Rule b) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 946d2af..dd004bc 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -22,20 +22,15 @@ import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.FileConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
@@ -54,9 +49,6 @@ import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -66,12 +58,11 @@ import java.util.stream.Stream;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkGraphComputer implements GraphComputer {
+public final class SparkGraphComputer implements GraphComputer {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SparkGraphComputer.class);
 
     protected final SparkConf configuration = new SparkConf();
-
     protected final HadoopGraph hadoopGraph;
     private boolean executed = false;
     private final Set<MapReduce> mapReducers = new HashSet<>();
@@ -116,137 +107,99 @@ public class SparkGraphComputer implements GraphComputer {
         if (null == this.vertexProgram && this.mapReducers.isEmpty())
             throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
         // it is possible to run mapreducers without a vertex program
-        if (null != this.vertexProgram)
+        if (null != this.vertexProgram) {
             GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
-
+            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
+        }
+        // apache and hadoop configurations that are used throughout
         final org.apache.commons.configuration.Configuration apacheConfiguration = this.hadoopGraph.configuration();
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.hadoopGraph.configuration());
 
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
                     final long startTime = System.currentTimeMillis();
                     SparkMemory memory = null;
-                    // load the graph
+                    SparkHelper.deleteOutputDirectory(hadoopConfiguration);
+                    ////////////////////////////////
+                    // process the vertex program //
+                    ////////////////////////////////
                     if (null != this.vertexProgram) {
+                        // set up the spark job
                         final SparkConf sparkConfiguration = new SparkConf();
                         sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
-
-                        // set up the input format
+                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                         ///
                         try {
-                            final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                            // create a message-passing friendly rdd from the hadoop input format
+                            JavaPairRDD<Object, SparkMessenger<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                     (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                     NullWritable.class,
-                                    VertexWritable.class);
-                            final JavaPairRDD<Object, SparkMessenger<Object>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
-                            GraphComputerRDD<Object> g = GraphComputerRDD.of(rdd2);
+                                    VertexWritable.class)
+                                    .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
 
-                            // set up the vertex program
+                            // set up the vertex program and wire up configurations
                             memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                             this.vertexProgram.setup(memory);
                             final SerializableConfiguration vertexProgramConfiguration = new SerializableConfiguration();
                             this.vertexProgram.storeState(vertexProgramConfiguration);
-                            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
                             ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
                             ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+
                             // execute the vertex program
-                            while (true) {
-                                g = g.execute(vertexProgramConfiguration, memory);
-                                g.foreachPartition(iterator -> doNothing());
+                            do {
+                                graphRDD = SparkHelper.executeStep(graphRDD, this.vertexProgram, memory, vertexProgramConfiguration);
+                                graphRDD.foreachPartition(iterator -> doNothing()); // i think this is a fast way to execute the rdd
+                                graphRDD.cache(); // TODO: learn about persistence and caching
                                 memory.incrIteration();
-                                if (this.vertexProgram.terminate(memory))
-                                    break;
-                            }
+                            } while (!this.vertexProgram.terminate(memory));
+
                             // write the output graph back to disk
-                            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-                            if (null != outputLocation) {
-                                try {
-                                    FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
-                                } catch (final IOException e) {
-                                    throw new IllegalStateException(e.getMessage(), e);
-                                }
-                                // map back to a <nullwritable,vertexwritable> stream for output
-                                g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
-                                        .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
-                                                NullWritable.class,
-                                                VertexWritable.class,
-                                                (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
-                            }
+                            SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
                         } finally {
+                            // must close the context or bad things happen
                             sparkContext.close();
                         }
+                        sparkContext.close(); // why not try again todo
                     }
 
+                    //////////////////////////////
+                    // process the map reducers //
+                    //////////////////////////////
                     final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
-                    // execute mapreduce jobs
                     for (final MapReduce mapReduce : this.mapReducers) {
-                        // set up the map reduce job
-                        final SerializableConfiguration newConfiguration = new SerializableConfiguration(apacheConfiguration);
-                        mapReduce.storeState(newConfiguration);
-
-                        // set up spark job
+                        // set up the spark job
                         final SparkConf sparkConfiguration = new SparkConf();
                         sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
                             hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
-                        // set up the input format
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+                        // execute the map reduce job
                         try {
-                            final JavaPairRDD<NullWritable, VertexWritable> g = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                            final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                     (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                     NullWritable.class,
                                     VertexWritable.class);
 
+                            final SerializableConfiguration newApacheConfiguration = new SerializableConfiguration(apacheConfiguration);
+                            mapReduce.storeState(newApacheConfiguration);
                             // map
-                            JavaPairRDD<?, ?> mapRDD = g.flatMapToPair(tuple -> {
-                                final MapReduce m = MapReduce.createMapReduce(newConfiguration);
-                                final SparkMapEmitter mapEmitter = new SparkMapEmitter();
-                                m.map(tuple._2().get(), mapEmitter);
-                                return mapEmitter.getEmissions();
-                            });
-                            if (mapReduce.getMapKeySort().isPresent())
-                                mapRDD = mapRDD.sortByKey((Comparator) mapReduce.getMapKeySort().get());
-                            // todo: combine
+                            final JavaPairRDD mapRDD = SparkHelper.executeMap(hadoopGraphRDD, mapReduce, newApacheConfiguration);
+                            // combine todo
                             // reduce
-                            JavaPairRDD<?, ?> reduceRDD = null;
-                            if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
-                                reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
-                                    final MapReduce m = MapReduce.createMapReduce(newConfiguration);
-                                    final SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
-                                    m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
-                                    return reduceEmitter.getEmissions();
-                                });
-                                if (mapReduce.getReduceKeySort().isPresent())
-                                    reduceRDD = reduceRDD.sortByKey((Comparator) mapReduce.getReduceKeySort().get());
-                            }
-                            // write the output graph back to disk
-                            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-                            if (null != outputLocation) {
-                                // map back to a Hadoop stream for output
-                                ((null == reduceRDD) ? mapRDD : reduceRDD).mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
-                                        ObjectWritable.class,
-                                        ObjectWritable.class,
-                                        (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
-                                // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
-                                // to get results you have to look through HDFS directory structure. Oh the horror.
-                                try {
-                                    if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
-                                        mapReduce.addResultToMemory(finalMemory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
-                                    else
-                                        HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
-                                } catch (final IOException e) {
-                                    throw new IllegalStateException(e.getMessage(), e);
-                                }
-                            }
+                            final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+
+                            // write the map reduce output back to disk (memory)
+                            SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                         } finally {
+                            // must close the context or bad things happen
                             sparkContext.close();
                         }
+                        sparkContext.close(); // why not try again todo
                     }
 
                     // update runtime and return the newly computed graph
@@ -256,6 +209,8 @@ public class SparkGraphComputer implements GraphComputer {
         );
     }
 
+    /////////////////
+
     private static final void doNothing() {
         // a cheap action
     }
@@ -278,19 +233,18 @@ public class SparkGraphComputer implements GraphComputer {
         }
     }
 
-    /////////////////
-
     public static void main(final String[] args) throws Exception {
-        final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-gryo.properties");
-        // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
-        final HadoopGraph graph = HadoopGraph.open(configuration);
-        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
-        // TODO: remove everything below
-        System.out.println(result);
-        //result.memory().<Iterator>get(PageRankMapReduce.DEFAULT_MEMORY_KEY).forEachRemaining(System.out::println);
-        //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
-        result.graph().V().valueMap().forEachRemaining(System.out::println);
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(configuration)).submit().get();
     }
 
-
+    @Override
+    public Features features() {
+        return new Features() {
+            @Override
+            public boolean supportsNonSerializableObjects() {
+                return true;  // TODO
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
index 0f5acc1..6cd8885 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -27,7 +27,7 @@ import java.util.List;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
 
     private final List<Tuple2<K, V>> emissions = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index b277e83..90bc73a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -60,7 +60,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         for (final String key : this.memoryKeys) {
             this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), new RuleAccumulator()));
         }
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
index 470774a..10b9525 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
@@ -23,7 +23,7 @@ import org.apache.spark.AccumulatorParam;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkMemoryAccumulator implements AccumulatorParam<SparkMemory> {
+public final class SparkMemoryAccumulator implements AccumulatorParam<SparkMemory> {
     @Override
     public SparkMemory addAccumulator(final SparkMemory first, final SparkMemory second) {
         return first;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index cc170c4..812bdd3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -28,28 +28,31 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public class SparkMessenger<M> implements Serializable, Messenger<M> {
 
-    protected Vertex vertex;
-    protected List<M> incoming;
-    protected Map<Object, List<M>> outgoing = new HashMap<>();
+    private Vertex vertex;
+    private List<M> incoming;
+    private Map<Object, List<M>> outgoing = new HashMap<>();
 
     public SparkMessenger() {
 
     }
 
+    public SparkMessenger(final Vertex vertex) {
+        this.vertex = vertex;
+        this.incoming = new ArrayList<>();
+    }
+
     public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
         this.vertex = vertex;
         this.incoming = incomingMessages;
@@ -63,6 +66,22 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
         this.outgoing.clear();
     }
 
+    public Vertex getVertex() {
+        return this.vertex;
+    }
+
+    public void setVertex(final Vertex vertex) {
+        this.vertex = vertex;
+    }
+
+    public void addIncomingMessages(final SparkMessenger<M> otherMessenger) {
+        this.incoming.addAll(otherMessenger.incoming);
+    }
+
+    public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
+        return this.outgoing.entrySet();
+    }
+
     @Override
     public Iterable<M> receiveMessages(final MessageScope messageScope) {
         return this.incoming;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
index b9f056c..77e7072 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
@@ -27,7 +27,7 @@ import java.util.List;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
 
     private final List<Tuple2<OK, OV>> emissions = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
deleted file mode 100644
index 121ae2d..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
-* @author Marko A. Rodriguez (http://markorodriguez.com)
-*/
-public final class ToyVertex implements Vertex, Vertex.Iterators, Serializable {
-
-    private final Object id;
-    private static final String TOY_VERTEX = "toyVertex";
-
-    public ToyVertex(final Object id) {
-        this.id = id;
-    }
-
-    ToyVertex() {
-        this.id = null;
-    }
-
-    @Override
-    public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object id() {
-        return this.id;
-    }
-
-    @Override
-    public String label() {
-        return TOY_VERTEX;
-    }
-
-    @Override
-    public Graph graph() {
-        return EmptyGraph.instance();
-    }
-
-    @Override
-    public <V> VertexProperty<V> property(final String key, final V value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Iterators iterators() {
-        return this;
-    }
-
-    @Override
-    public Iterator<Edge> edgeIterator(Direction direction, String... edgeLabels) {
-        return Collections.emptyIterator();
-    }
-
-    @Override
-    public Iterator<Vertex> vertexIterator(Direction direction, String... edgeLabels) {
-        return Collections.emptyIterator();
-    }
-
-    @Override
-    public <V> Iterator<VertexProperty<V>> propertyIterator(String... propertyKeys) {
-        return Collections.emptyIterator();
-    }
-
-    @Override
-    public int hashCode() {
-        return ElementHelper.hashCode(this);
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        return ElementHelper.areEqual(this, other);
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.vertexString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
new file mode 100644
index 0000000..ece9d7c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessenger;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkHelper {
+
+    private SparkHelper() {
+    }
+
+    public static <M> JavaPairRDD<Object, SparkMessenger<M>> executeStep(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final VertexProgram<M> globalVertexProgram, final SparkMemory memory, final Configuration apacheConfiguration) {
+        JavaPairRDD<Object, SparkMessenger<M>> current = graphRDD;
+        // execute vertex program
+        current = current.mapPartitionsToPair(iterator -> {     // each partition has a copy of the vertex program
+            final VertexProgram<M> vertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+            return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
+                vertexProgram.execute(tuple._2().getVertex(), tuple._2(), memory);
+                return tuple;
+            });
+        });
+        // clear all previous incoming messages
+        if (!memory.isInitialIteration()) {
+            current = current.mapValues(messenger -> {
+                messenger.clearIncomingMessages();
+                return messenger;
+            });
+        }
+        // emit messages
+        current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
+            final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().getOutgoingMessages()
+                    .stream()
+                    .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new DetachedVertex(entry.getKey(), Vertex.DEFAULT_LABEL, Collections.emptyMap()), entry.getValue()))) // maybe go back to toy vertex if label is expensive
+                    .collect(Collectors.toList());          // the message vertices
+            list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
+            return list;
+        });
+
+        // TODO: local message combiner
+        if (globalVertexProgram.getMessageCombiner().isPresent()) {
+           /* current = current.combineByKey(messenger -> {
+                return messenger;
+            });*/
+        }
+
+        // "message pass" via reduction
+        current = current.reduceByKey((a, b) -> {
+            if (a.getVertex() instanceof DetachedVertex && !(b.getVertex() instanceof DetachedVertex))
+                a.setVertex(b.getVertex());
+            a.addIncomingMessages(b);
+            return a;
+        });
+
+        // clear all previous outgoing messages
+        current = current.mapValues(messenger -> {
+            messenger.clearOutgoingMessages();
+            return messenger;
+        });
+        return current;
+    }
+
+    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.flatMapToPair(tuple -> {
+            final MapReduce<K, V, ?, ?, ?> m = MapReduce.createMapReduce(apacheConfiguration);    // todo create only for each partition
+            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+            m.map(tuple._2().get(), mapEmitter);
+            return mapEmitter.getEmissions();
+        });
+        if (mapReduce.getMapKeySort().isPresent())
+            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+        return mapRDD;
+    }
+
+    // TODO: public static executeCombine()
+
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
+            final MapReduce<K, V, OK, OV, ?> m = MapReduce.createMapReduce(apacheConfiguration);     // todo create only for each partition
+            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+            m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
+            return reduceEmitter.getEmissions();
+        });
+        if (mapReduce.getReduceKeySort().isPresent())
+            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+        return reduceRDD;
+    }
+
+    public static void deleteOutputDirectory(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            try {
+                FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+
+    public static <M> void saveVertexProgramRDD(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().getVertex())))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
+        }
+    }
+
+    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a Hadoop stream for output
+            mapReduceRDD.mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                    ObjectWritable.class,
+                    ObjectWritable.class,
+                    (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
+            // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+            // to get results you have to look through HDFS directory structure. Oh the horror.
+            try {
+                if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+                    mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+                else
+                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 76636cd..0fb2e9f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.util.StreamFactory;
@@ -86,9 +85,9 @@ public class HadoopConfiguration extends BaseConfiguration implements Serializab
         this.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
     }
 
-    public Class<? extends GraphComputer> getGraphComputer() {
+    public Class<? extends GraphComputer> getGraphComputer(final Class<? extends GraphComputer> defaultGraphComputer) {
         if (!this.containsKey(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER))
-            return GiraphGraphComputer.class;
+            return defaultGraphComputer;
         else {
             try {
                 return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 8154888..990fc77 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -146,7 +146,7 @@ public class HadoopGraph implements Graph, Graph.Iterators {
 
     private HadoopGraph(final Configuration configuration) {
         this.configuration = new HadoopConfiguration(configuration);
-        this.graphComputerClass = this.configuration.getGraphComputer();
+        this.graphComputerClass = this.configuration.getGraphComputer(GiraphGraphComputer.class);
     }
 
     public static HadoopGraph open() {