You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/03 23:31:59 UTC

incubator-tinkerpop git commit: lots of clean up and organization. SparkGraphComputer is now really clean with all the dirty work being done by SparkHelper.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/spark 8246ee6d5 -> b6133ae75


lots of clean up and organization. SparkGraphComputer is now really clean with all the dirty work being done by SparkHelper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b6133ae7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b6133ae7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b6133ae7

Branch: refs/heads/spark
Commit: b6133ae75e4f8ebb29f0da042c37e9ce09d92ca4
Parents: 8246ee6
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 15:32:05 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 15:32:05 2015 -0700

----------------------------------------------------------------------
 .../computer/giraph/GiraphGraphComputer.java    |  10 +-
 .../computer/spark/GraphComputerRDD.java        | 106 -----------
 .../process/computer/spark/RuleAccumulator.java |   2 +-
 .../computer/spark/SparkGraphComputer.java      | 158 ++++++-----------
 .../process/computer/spark/SparkMapEmitter.java |   2 +-
 .../process/computer/spark/SparkMemory.java     |   1 -
 .../computer/spark/SparkMemoryAccumulator.java  |   2 +-
 .../process/computer/spark/SparkMessenger.java  |  31 +++-
 .../computer/spark/SparkReduceEmitter.java      |   2 +-
 .../process/computer/spark/ToyVertex.java       | 114 ------------
 .../computer/spark/util/SparkHelper.java        | 177 +++++++++++++++++++
 .../hadoop/structure/HadoopConfiguration.java   |   5 +-
 .../gremlin/hadoop/structure/HadoopGraph.java   |   2 +-
 13 files changed, 267 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 589c22c..56d029c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -221,14 +221,8 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
     }
 
     public static void main(final String[] args) throws Exception {
-        try {
-            final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
-            final GiraphGraphComputer computer = new GiraphGraphComputer(HadoopGraph.open(configuration));
-            computer.program(VertexProgram.createVertexProgram(configuration)).submit().get();
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        }
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new GiraphGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(configuration)).submit().get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
deleted file mode 100644
index 786e5af..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.function.FlatMapFunction2;
-import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-import scala.reflect.ManifestFactory;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>> {
-
-    public GraphComputerRDD(final RDD<Tuple2<Object, SparkMessenger<M>>> rdd) {
-        super(rdd, ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
-    }
-
-    public GraphComputerRDD(final JavaPairRDD<Object, SparkMessenger<M>> rdd) {
-        super(rdd.rdd(), ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
-    }
-
-    public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
-        JavaPairRDD<Object, SparkMessenger<M>> current = this;
-        // execute vertex program
-        current = current.mapPartitionsToPair(iterator -> {
-            final VertexProgram<M> vertexProgram = VertexProgram.createVertexProgram(configuration);
-            return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
-                vertexProgram.execute(tuple._2().vertex, tuple._2(), memory);
-                return tuple;
-            });
-        });
-        // clear all previous incoming messages
-        if (!memory.isInitialIteration()) {
-            current = current.mapValues(messenger -> {
-                messenger.clearIncomingMessages();
-                return messenger;
-            });
-        }
-        // emit messages
-        current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
-            final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().outgoing.entrySet()
-                    .stream()
-                    .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new ToyVertex(entry.getKey()), entry.getValue())))
-                    .collect(Collectors.toList());          // the message vertices
-            list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
-            return list;
-        });
-        // "message pass" via reduction
-        current = current.reduceByKey((a, b) -> {
-            if (a.vertex instanceof ToyVertex && !(b.vertex instanceof ToyVertex))
-                a.vertex = b.vertex;
-            a.incoming.addAll(b.incoming);
-            return a;
-        });
-        // clear all previous outgoing messages
-        current = current.mapValues(messenger -> {
-            messenger.clearOutgoingMessages();
-            return messenger;
-        });
-        return GraphComputerRDD.of(current);
-    }
-
-    public static <M> GraphComputerRDD<M> of(final JavaPairRDD<Object, SparkMessenger<M>> javaPairRDD) {
-        return new GraphComputerRDD<>(javaPairRDD);
-    }
-
-    public static <M> GraphComputerRDD<M> of(final JavaRDD<Tuple2<Object, SparkMessenger<M>>> javaRDD) {
-        return new GraphComputerRDD<>(javaRDD.rdd());
-    }
-
-    //////////////
-
-    // TODO: What the hell is this for?
-    @Override
-    public JavaRDD zipPartitions(JavaRDDLike uJavaRDDLike, FlatMapFunction2 iteratorIteratorVFlatMapFunction2) {
-        return (JavaRDD) new JavaRDD<>(null, null);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
index 59da2f4..446dbdb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
@@ -24,7 +24,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class RuleAccumulator implements AccumulatorParam<Rule> {
+public final class RuleAccumulator implements AccumulatorParam<Rule> {
 
     @Override
     public Rule addAccumulator(final Rule a, final Rule b) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 946d2af..dd004bc 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -22,20 +22,15 @@ import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.commons.configuration.FileConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
@@ -54,9 +49,6 @@ import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -66,12 +58,11 @@ import java.util.stream.Stream;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkGraphComputer implements GraphComputer {
+public final class SparkGraphComputer implements GraphComputer {
 
     public static final Logger LOGGER = LoggerFactory.getLogger(SparkGraphComputer.class);
 
     protected final SparkConf configuration = new SparkConf();
-
     protected final HadoopGraph hadoopGraph;
     private boolean executed = false;
     private final Set<MapReduce> mapReducers = new HashSet<>();
@@ -116,137 +107,99 @@ public class SparkGraphComputer implements GraphComputer {
         if (null == this.vertexProgram && this.mapReducers.isEmpty())
             throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
         // it is possible to run mapreducers without a vertex program
-        if (null != this.vertexProgram)
+        if (null != this.vertexProgram) {
             GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
-
+            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
+        }
+        // apache and hadoop configurations that are used throughout
         final org.apache.commons.configuration.Configuration apacheConfiguration = this.hadoopGraph.configuration();
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.hadoopGraph.configuration());
 
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
                     final long startTime = System.currentTimeMillis();
                     SparkMemory memory = null;
-                    // load the graph
+                    SparkHelper.deleteOutputDirectory(hadoopConfiguration);
+                    ////////////////////////////////
+                    // process the vertex program //
+                    ////////////////////////////////
                     if (null != this.vertexProgram) {
+                        // set up the spark job
                         final SparkConf sparkConfiguration = new SparkConf();
                         sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
-                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
-
-                        // set up the input format
+                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
                         ///
                         try {
-                            final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                            // create a message-passing friendly rdd from the hadoop input format
+                            JavaPairRDD<Object, SparkMessenger<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                     (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                     NullWritable.class,
-                                    VertexWritable.class);
-                            final JavaPairRDD<Object, SparkMessenger<Object>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
-                            GraphComputerRDD<Object> g = GraphComputerRDD.of(rdd2);
+                                    VertexWritable.class)
+                                    .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
 
-                            // set up the vertex program
+                            // set up the vertex program and wire up configurations
                             memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                             this.vertexProgram.setup(memory);
                             final SerializableConfiguration vertexProgramConfiguration = new SerializableConfiguration();
                             this.vertexProgram.storeState(vertexProgramConfiguration);
-                            this.mapReducers.addAll(this.vertexProgram.getMapReducers());
                             ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
                             ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+
                             // execute the vertex program
-                            while (true) {
-                                g = g.execute(vertexProgramConfiguration, memory);
-                                g.foreachPartition(iterator -> doNothing());
+                            do {
+                                graphRDD = SparkHelper.executeStep(graphRDD, this.vertexProgram, memory, vertexProgramConfiguration);
+                                graphRDD.foreachPartition(iterator -> doNothing()); // i think this is a fast way to execute the rdd
+                                graphRDD.cache(); // TODO: learn about persistence and caching
                                 memory.incrIteration();
-                                if (this.vertexProgram.terminate(memory))
-                                    break;
-                            }
+                            } while (!this.vertexProgram.terminate(memory));
+
                             // write the output graph back to disk
-                            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-                            if (null != outputLocation) {
-                                try {
-                                    FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
-                                } catch (final IOException e) {
-                                    throw new IllegalStateException(e.getMessage(), e);
-                                }
-                                // map back to a <nullwritable,vertexwritable> stream for output
-                                g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
-                                        .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
-                                                NullWritable.class,
-                                                VertexWritable.class,
-                                                (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
-                            }
+                            SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
                         } finally {
+                            // must close the context or bad things happen
                             sparkContext.close();
                         }
+                        sparkContext.close(); // why not try again todo
                     }
 
+                    //////////////////////////////
+                    // process the map reducers //
+                    //////////////////////////////
                     final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
-                    // execute mapreduce jobs
                     for (final MapReduce mapReduce : this.mapReducers) {
-                        // set up the map reduce job
-                        final SerializableConfiguration newConfiguration = new SerializableConfiguration(apacheConfiguration);
-                        mapReduce.storeState(newConfiguration);
-
-                        // set up spark job
+                        // set up the spark job
                         final SparkConf sparkConfiguration = new SparkConf();
                         sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
                         hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                         if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
                             hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
-                        // set up the input format
                         final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+                        // execute the map reduce job
                         try {
-                            final JavaPairRDD<NullWritable, VertexWritable> g = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                            final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                     (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                     NullWritable.class,
                                     VertexWritable.class);
 
+                            final SerializableConfiguration newApacheConfiguration = new SerializableConfiguration(apacheConfiguration);
+                            mapReduce.storeState(newApacheConfiguration);
                             // map
-                            JavaPairRDD<?, ?> mapRDD = g.flatMapToPair(tuple -> {
-                                final MapReduce m = MapReduce.createMapReduce(newConfiguration);
-                                final SparkMapEmitter mapEmitter = new SparkMapEmitter();
-                                m.map(tuple._2().get(), mapEmitter);
-                                return mapEmitter.getEmissions();
-                            });
-                            if (mapReduce.getMapKeySort().isPresent())
-                                mapRDD = mapRDD.sortByKey((Comparator) mapReduce.getMapKeySort().get());
-                            // todo: combine
+                            final JavaPairRDD mapRDD = SparkHelper.executeMap(hadoopGraphRDD, mapReduce, newApacheConfiguration);
+                            // combine todo
                             // reduce
-                            JavaPairRDD<?, ?> reduceRDD = null;
-                            if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
-                                reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
-                                    final MapReduce m = MapReduce.createMapReduce(newConfiguration);
-                                    final SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
-                                    m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
-                                    return reduceEmitter.getEmissions();
-                                });
-                                if (mapReduce.getReduceKeySort().isPresent())
-                                    reduceRDD = reduceRDD.sortByKey((Comparator) mapReduce.getReduceKeySort().get());
-                            }
-                            // write the output graph back to disk
-                            final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
-                            if (null != outputLocation) {
-                                // map back to a Hadoop stream for output
-                                ((null == reduceRDD) ? mapRDD : reduceRDD).mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
-                                        ObjectWritable.class,
-                                        ObjectWritable.class,
-                                        (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
-                                // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
-                                // to get results you have to look through HDFS directory structure. Oh the horror.
-                                try {
-                                    if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
-                                        mapReduce.addResultToMemory(finalMemory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
-                                    else
-                                        HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
-                                } catch (final IOException e) {
-                                    throw new IllegalStateException(e.getMessage(), e);
-                                }
-                            }
+                            final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+
+                            // write the map reduce output back to disk (memory)
+                            SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                         } finally {
+                            // must close the context or bad things happen
                             sparkContext.close();
                         }
+                        sparkContext.close(); // why not try again todo
                     }
 
                     // update runtime and return the newly computed graph
@@ -256,6 +209,8 @@ public class SparkGraphComputer implements GraphComputer {
         );
     }
 
+    /////////////////
+
     private static final void doNothing() {
         // a cheap action
     }
@@ -278,19 +233,18 @@ public class SparkGraphComputer implements GraphComputer {
         }
     }
 
-    /////////////////
-
     public static void main(final String[] args) throws Exception {
-        final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-gryo.properties");
-        // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
-        final HadoopGraph graph = HadoopGraph.open(configuration);
-        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
-        // TODO: remove everything below
-        System.out.println(result);
-        //result.memory().<Iterator>get(PageRankMapReduce.DEFAULT_MEMORY_KEY).forEachRemaining(System.out::println);
-        //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
-        result.graph().V().valueMap().forEachRemaining(System.out::println);
+        final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(configuration)).submit().get();
     }
 
-
+    @Override
+    public Features features() {
+        return new Features() {
+            @Override
+            public boolean supportsNonSerializableObjects() {
+                return true;  // TODO
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
index 0f5acc1..6cd8885 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -27,7 +27,7 @@ import java.util.List;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
 
     private final List<Tuple2<K, V>> emissions = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index b277e83..90bc73a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -60,7 +60,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
         for (final String key : this.memoryKeys) {
             this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), new RuleAccumulator()));
         }
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
index 470774a..10b9525 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
@@ -23,7 +23,7 @@ import org.apache.spark.AccumulatorParam;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkMemoryAccumulator implements AccumulatorParam<SparkMemory> {
+public final class SparkMemoryAccumulator implements AccumulatorParam<SparkMemory> {
     @Override
     public SparkMemory addAccumulator(final SparkMemory first, final SparkMemory second) {
         return first;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index cc170c4..812bdd3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -28,28 +28,31 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public class SparkMessenger<M> implements Serializable, Messenger<M> {
 
-    protected Vertex vertex;
-    protected List<M> incoming;
-    protected Map<Object, List<M>> outgoing = new HashMap<>();
+    private Vertex vertex;
+    private List<M> incoming;
+    private Map<Object, List<M>> outgoing = new HashMap<>();
 
     public SparkMessenger() {
 
     }
 
+    public SparkMessenger(final Vertex vertex) {
+        this.vertex = vertex;
+        this.incoming = new ArrayList<>();
+    }
+
     public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
         this.vertex = vertex;
         this.incoming = incomingMessages;
@@ -63,6 +66,22 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
         this.outgoing.clear();
     }
 
+    public Vertex getVertex() {
+        return this.vertex;
+    }
+
+    public void setVertex(final Vertex vertex) {
+        this.vertex = vertex;
+    }
+
+    public void addIncomingMessages(final SparkMessenger<M> otherMessenger) {
+        this.incoming.addAll(otherMessenger.incoming);
+    }
+
+    public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
+        return this.outgoing.entrySet();
+    }
+
     @Override
     public Iterable<M> receiveMessages(final MessageScope messageScope) {
         return this.incoming;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
index b9f056c..77e7072 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
@@ -27,7 +27,7 @@ import java.util.List;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
 
     private final List<Tuple2<OK, OV>> emissions = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
deleted file mode 100644
index 121ae2d..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
-* @author Marko A. Rodriguez (http://markorodriguez.com)
-*/
-public final class ToyVertex implements Vertex, Vertex.Iterators, Serializable {
-
-    private final Object id;
-    private static final String TOY_VERTEX = "toyVertex";
-
-    public ToyVertex(final Object id) {
-        this.id = id;
-    }
-
-    ToyVertex() {
-        this.id = null;
-    }
-
-    @Override
-    public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Object id() {
-        return this.id;
-    }
-
-    @Override
-    public String label() {
-        return TOY_VERTEX;
-    }
-
-    @Override
-    public Graph graph() {
-        return EmptyGraph.instance();
-    }
-
-    @Override
-    public <V> VertexProperty<V> property(final String key, final V value) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Iterators iterators() {
-        return this;
-    }
-
-    @Override
-    public Iterator<Edge> edgeIterator(Direction direction, String... edgeLabels) {
-        return Collections.emptyIterator();
-    }
-
-    @Override
-    public Iterator<Vertex> vertexIterator(Direction direction, String... edgeLabels) {
-        return Collections.emptyIterator();
-    }
-
-    @Override
-    public <V> Iterator<VertexProperty<V>> propertyIterator(String... propertyKeys) {
-        return Collections.emptyIterator();
-    }
-
-    @Override
-    public int hashCode() {
-        return ElementHelper.hashCode(this);
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        return ElementHelper.areEqual(this, other);
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.vertexString(this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
new file mode 100644
index 0000000..ece9d7c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessenger;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkHelper {
+
+    private SparkHelper() {
+    }
+
+    public static <M> JavaPairRDD<Object, SparkMessenger<M>> executeStep(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final VertexProgram<M> globalVertexProgram, final SparkMemory memory, final Configuration apacheConfiguration) {
+        JavaPairRDD<Object, SparkMessenger<M>> current = graphRDD;
+        // execute vertex program
+        current = current.mapPartitionsToPair(iterator -> {     // each partition has a copy of the vertex program
+            final VertexProgram<M> vertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+            return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
+                vertexProgram.execute(tuple._2().getVertex(), tuple._2(), memory);
+                return tuple;
+            });
+        });
+        // clear all previous incoming messages
+        if (!memory.isInitialIteration()) {
+            current = current.mapValues(messenger -> {
+                messenger.clearIncomingMessages();
+                return messenger;
+            });
+        }
+        // emit messages
+        current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
+            final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().getOutgoingMessages()
+                    .stream()
+                    .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new DetachedVertex(entry.getKey(), Vertex.DEFAULT_LABEL, Collections.emptyMap()), entry.getValue()))) // maybe go back to toy vertex if label is expensive
+                    .collect(Collectors.toList());          // the message vertices
+            list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
+            return list;
+        });
+
+        // TODO: local message combiner
+        if (globalVertexProgram.getMessageCombiner().isPresent()) {
+           /* current = current.combineByKey(messenger -> {
+                return messenger;
+            });*/
+        }
+
+        // "message pass" via reduction
+        current = current.reduceByKey((a, b) -> {
+            if (a.getVertex() instanceof DetachedVertex && !(b.getVertex() instanceof DetachedVertex))
+                a.setVertex(b.getVertex());
+            a.addIncomingMessages(b);
+            return a;
+        });
+
+        // clear all previous outgoing messages
+        current = current.mapValues(messenger -> {
+            messenger.clearOutgoingMessages();
+            return messenger;
+        });
+        return current;
+    }
+
+    public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.flatMapToPair(tuple -> {
+            final MapReduce<K, V, ?, ?, ?> m = MapReduce.createMapReduce(apacheConfiguration);    // todo create only for each partition
+            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+            m.map(tuple._2().get(), mapEmitter);
+            return mapEmitter.getEmissions();
+        });
+        if (mapReduce.getMapKeySort().isPresent())
+            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+        return mapRDD;
+    }
+
+    // TODO: public static executeCombine()
+
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+        JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
+            final MapReduce<K, V, OK, OV, ?> m = MapReduce.createMapReduce(apacheConfiguration);     // todo create only for each partition
+            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+            m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
+            return reduceEmitter.getEmissions();
+        });
+        if (mapReduce.getReduceKeySort().isPresent())
+            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+        return reduceRDD;
+    }
+
+    public static void deleteOutputDirectory(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            try {
+                FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+
+    public static <M> void saveVertexProgramRDD(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a <nullwritable,vertexwritable> stream for output
+            graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().getVertex())))
+                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
+                            NullWritable.class,
+                            VertexWritable.class,
+                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
+        }
+    }
+
+    public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+        if (null != outputLocation) {
+            // map back to a Hadoop stream for output
+            mapReduceRDD.mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                    ObjectWritable.class,
+                    ObjectWritable.class,
+                    (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
+            // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+            // to get results you have to look through HDFS directory structure. Oh the horror.
+            try {
+                if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+                    mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+                else
+                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+            } catch (final IOException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 76636cd..0fb2e9f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.util.StreamFactory;
@@ -86,9 +85,9 @@ public class HadoopConfiguration extends BaseConfiguration implements Serializab
         this.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
     }
 
-    public Class<? extends GraphComputer> getGraphComputer() {
+    public Class<? extends GraphComputer> getGraphComputer(final Class<? extends GraphComputer> defaultGraphComputer) {
         if (!this.containsKey(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER))
-            return GiraphGraphComputer.class;
+            return defaultGraphComputer;
         else {
             try {
                 return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 8154888..990fc77 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -146,7 +146,7 @@ public class HadoopGraph implements Graph, Graph.Iterators {
 
     private HadoopGraph(final Configuration configuration) {
         this.configuration = new HadoopConfiguration(configuration);
-        this.graphComputerClass = this.configuration.getGraphComputer();
+        this.graphComputerClass = this.configuration.getGraphComputer(GiraphGraphComputer.class);
     }
 
     public static HadoopGraph open() {