You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:03 UTC

[08/20] incubator-tinkerpop git commit: SparkGraphComputer is prim and proper. No longer using public static void main(). The GraphComputer API is legitamately implemented. All that is left --- MapReduce engine (easy) and GraphComputer Memory (hard).

SparkGraphComputer is prim and proper. No longer using public static void main(). The GraphComputer API is legitamately implemented.  All that is left --- MapReduce engine (easy) and GraphComputer Memory (hard).


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/051994ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/051994ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/051994ae

Branch: refs/heads/master
Commit: 051994aeac31de6e02213f1de5eb258f798602eb
Parents: c98d5be
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 09:28:28 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 09:28:28 2015 -0700

----------------------------------------------------------------------
 hadoop-gremlin/conf/giraph-graphson.properties  |  57 ++++++++
 hadoop-gremlin/conf/giraph-kryo.properties      |  31 +++++
 hadoop-gremlin/conf/hadoop-graphson.properties  |  57 --------
 hadoop-gremlin/conf/hadoop-kryo.properties      |  31 -----
 hadoop-gremlin/conf/spark-kryo.properties       |  38 ++++++
 .../computer/giraph/GiraphGraphComputer.java    |   3 +-
 .../computer/spark/GraphComputerRDD.java        |   3 +-
 .../hadoop/process/computer/spark/RDDTools.java |  46 -------
 .../spark/SerializableConfiguration.java        |   2 +-
 .../computer/spark/SparkGraphComputer.java      | 135 +++++++++++++------
 .../process/computer/spark/SparkMessenger.java  |   5 -
 .../process/computer/spark/SparkVertex.java     |  44 +++---
 12 files changed, 248 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/giraph-graphson.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/giraph-graphson.properties b/hadoop-gremlin/conf/giraph-graphson.properties
new file mode 100644
index 0000000..090b0ce
--- /dev/null
+++ b/hadoop-gremlin/conf/giraph-graphson.properties
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# the graph class
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+# i/o formats for graphs and memory (i.e. computer result)
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+# i/o locations
+gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.ldjson
+gremlin.hadoop.outputLocation=output
+# deriving a complete view of the memory requires an extra mapreduce job and thus, if not needed, should be avoided
+gremlin.hadoop.deriveMemory=false
+# if the job jars are not on the classpath of every hadoop node, then they must be provided to the distributed cache at runtime
+gremlin.hadoop.jarsInDistributedCache=true
+# the vertex program to execute
+gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
+
+# It is possible to provide Giraph configuration parameters for use with GiraphGraphComputer
+############################################################################################
+giraph.minWorkers=2
+giraph.maxWorkers=2
+# giraph.useInputSplitLocality=false
+# giraph.logLevel=debug
+
+# It is possible to provide Hadoop configuration parameters.
+# Note that these parameters are provided to each MapReduce job within the entire Hadoop-Gremlin job pipeline.
+# Some of these parameters may be over written by Hadoop-Gremlin as deemed necessary.
+##############################################################################################################
+# mapred.linerecordreader.maxlength=5242880
+# mapred.map.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
+# mapred.reduce.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
+# mapred.map.tasks=6
+# mapred.reduce.tasks=3
+# mapred.job.reuse.jvm.num.tasks=-1
+# mapred.task.timeout=5400000
+# mapred.reduce.parallel.copies=50
+# io.sort.factor=100
+# io.sort.mb=200
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/giraph-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/giraph-kryo.properties b/hadoop-gremlin/conf/giraph-kryo.properties
new file mode 100644
index 0000000..d546da7
--- /dev/null
+++ b/hadoop-gremlin/conf/giraph-kryo.properties
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+gremlin.hadoop.deriveMemory=false
+gremlin.hadoop.jarsInDistributedCache=true
+
+gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.gio
+gremlin.hadoop.outputLocation=output
+#gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram
+#gremlin.traversalVertexProgram.traversalSupplier.type=CLASS
+#gremlin.traversalVertexProgram.traversalSupplier.object=org.apache.tinkerpop.gremlin.hadoop.process.computer.example.TraversalSupplier1
+
+giraph.minWorkers=2
+giraph.maxWorkers=2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/hadoop-graphson.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-graphson.properties b/hadoop-gremlin/conf/hadoop-graphson.properties
deleted file mode 100644
index 090b0ce..0000000
--- a/hadoop-gremlin/conf/hadoop-graphson.properties
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# the graph class
-gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
-# i/o formats for graphs and memory (i.e. computer result)
-gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
-# i/o locations
-gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.ldjson
-gremlin.hadoop.outputLocation=output
-# deriving a complete view of the memory requires an extra mapreduce job and thus, if not needed, should be avoided
-gremlin.hadoop.deriveMemory=false
-# if the job jars are not on the classpath of every hadoop node, then they must be provided to the distributed cache at runtime
-gremlin.hadoop.jarsInDistributedCache=true
-# the vertex program to execute
-gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
-
-# It is possible to provide Giraph configuration parameters for use with GiraphGraphComputer
-############################################################################################
-giraph.minWorkers=2
-giraph.maxWorkers=2
-# giraph.useInputSplitLocality=false
-# giraph.logLevel=debug
-
-# It is possible to provide Hadoop configuration parameters.
-# Note that these parameters are provided to each MapReduce job within the entire Hadoop-Gremlin job pipeline.
-# Some of these parameters may be over written by Hadoop-Gremlin as deemed necessary.
-##############################################################################################################
-# mapred.linerecordreader.maxlength=5242880
-# mapred.map.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
-# mapred.reduce.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
-# mapred.map.tasks=6
-# mapred.reduce.tasks=3
-# mapred.job.reuse.jvm.num.tasks=-1
-# mapred.task.timeout=5400000
-# mapred.reduce.parallel.copies=50
-# io.sort.factor=100
-# io.sort.mb=200
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/hadoop-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-kryo.properties b/hadoop-gremlin/conf/hadoop-kryo.properties
deleted file mode 100644
index d546da7..0000000
--- a/hadoop-gremlin/conf/hadoop-kryo.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
-gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.deriveMemory=false
-gremlin.hadoop.jarsInDistributedCache=true
-
-gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.gio
-gremlin.hadoop.outputLocation=output
-#gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram
-#gremlin.traversalVertexProgram.traversalSupplier.type=CLASS
-#gremlin.traversalVertexProgram.traversalSupplier.object=org.apache.tinkerpop.gremlin.hadoop.process.computer.example.TraversalSupplier1
-
-giraph.minWorkers=2
-giraph.maxWorkers=2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/spark-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-kryo.properties b/hadoop-gremlin/conf/spark-kryo.properties
new file mode 100644
index 0000000..de4df3b
--- /dev/null
+++ b/hadoop-gremlin/conf/spark-kryo.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+gremlin.hadoop.deriveMemory=false
+gremlin.hadoop.jarsInDistributedCache=true
+
+gremlin.hadoop.inputLocation=hdfs://localhost:9000/user/marko/tinkerpop-modern-vertices.gio
+gremlin.hadoop.outputLocation=output
+
+# the vertex program to execute
+gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
+
+# It is possible to provide Spark configuration parameters for use with SparkGraphComputer
+##########################################################################################
+spark.master=local[4]
+spark.executor.memory=1024m
+spark.eventLog.enabled=true
+#spark.serializer=org.apache.spark.serializer.KryoSerializer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index e382699..7a5e362 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -222,8 +222,7 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
 
     public static void main(final String[] args) throws Exception {
         try {
-            final FileConfiguration configuration = new PropertiesConfiguration();
-            configuration.load(new File(args[0]));
+            final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
             final GiraphGraphComputer computer = new GiraphGraphComputer(HadoopGraph.open(configuration));
             computer.program(VertexProgram.createVertexProgram(configuration)).submit().get();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
index abf0ac6..786e5af 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
@@ -57,7 +57,7 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
             });
         });
         // clear all previous incoming messages
-        if(!memory.isInitialIteration()) {
+        if (!memory.isInitialIteration()) {
             current = current.mapValues(messenger -> {
                 messenger.clearIncomingMessages();
                 return messenger;
@@ -97,6 +97,7 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
 
     //////////////
 
+    // TODO: What the hell is this for?
     @Override
     public JavaRDD zipPartitions(JavaRDDLike uJavaRDDLike, FlatMapFunction2 iteratorIteratorVFlatMapFunction2) {
         return (JavaRDD) new JavaRDD<>(null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java
deleted file mode 100644
index cef6040..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class RDDTools {
-
-    public static <M> void sendMessage(final Tuple2<Vertex, List<M>> tuple, final M message) {
-        tuple._2().add(message);
-    }
-
-    public static <M> Iterable<M> receiveMessages(final Tuple2<Vertex, List<M>> tuple) {
-        return tuple._2();
-    }
-
-    public static <M> JavaPairRDD<Vertex, List<M>> endIteration(final JavaPairRDD<Vertex, List<M>> graph) {
-        return graph.flatMapToPair(tuple -> tuple._2().stream().map(message -> new Tuple2<>(tuple._1(), Arrays.asList(message))).collect(Collectors.toList()));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
index a71b456..b4a8005 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
@@ -28,7 +28,7 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SerializableConfiguration extends AbstractConfiguration implements Serializable {
+public final class SerializableConfiguration extends AbstractConfiguration implements Serializable {
 
     private final Map<String, Object> configurations = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 107f1bc..774c3c7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -18,20 +18,28 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
@@ -57,46 +65,14 @@ public class SparkGraphComputer implements GraphComputer {
     protected final SparkConf configuration = new SparkConf();
 
     protected final HadoopGraph hadoopGraph;
-
     private boolean executed = false;
-    private final Set<MapReduce> mapReduces = new HashSet<>();
+    private final Set<MapReduce> mapReducers = new HashSet<>();
     private VertexProgram vertexProgram;
 
     public SparkGraphComputer(final HadoopGraph hadoopGraph) {
         this.hadoopGraph = hadoopGraph;
     }
 
-    public static void main(final String[] args) throws IOException {
-        final SparkConf configuration = new SparkConf();
-        configuration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX);
-        configuration.setMaster("local");
-        final JavaSparkContext sc = new JavaSparkContext(configuration);
-        //JavaRDD<String> rdd = sc.textFile("hdfs://localhost:9000/user/marko/religious-traversals.txt");
-        final Configuration conf = new Configuration();
-        conf.set("mapred.input.dir", "hdfs://localhost:9000/user/marko/grateful-dead-vertices.gio");
-        JavaPairRDD<NullWritable, VertexWritable> rdd = sc.newAPIHadoopRDD(conf, KryoInputFormat.class, NullWritable.class, VertexWritable.class);
-        JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
-
-        GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
-
-        final org.apache.commons.configuration.Configuration vertexProgram = new SerializableConfiguration();
-        final PageRankVertexProgram pageRankVertexProgram = PageRankVertexProgram.build().create();
-        pageRankVertexProgram.storeState(vertexProgram);
-        final SparkMemory memory = new SparkMemory(Collections.emptySet());
-
-        while (!pageRankVertexProgram.terminate(memory)) {
-            g = g.execute(vertexProgram, memory);
-            g.foreachPartition(iterator -> doNothing());
-            memory.incrIteration();
-        }
-        g.foreach(t -> System.out.println(t._2().vertex.property(PageRankVertexProgram.PAGE_RANK) + "-->" + t._2().vertex.value("name")));
-        System.out.println(g.count());
-    }
-
-    private static final void doNothing() {
-    }
-
-
     @Override
     public GraphComputer isolation(final Isolation isolation) {
         if (!isolation.equals(Isolation.BSP))
@@ -112,7 +88,7 @@ public class SparkGraphComputer implements GraphComputer {
 
     @Override
     public GraphComputer mapReduce(final MapReduce mapReduce) {
-        this.mapReduces.add(mapReduce);
+        this.mapReducers.add(mapReduce);
         return this;
     }
 
@@ -129,16 +105,95 @@ public class SparkGraphComputer implements GraphComputer {
             this.executed = true;
 
         // it is not possible execute a computer if it has no vertex program nor mapreducers
-        if (null == this.vertexProgram && this.mapReduces.isEmpty())
+        if (null == this.vertexProgram && this.mapReducers.isEmpty())
             throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
         // it is possible to run mapreducers without a vertex program
         if (null != this.vertexProgram)
             GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
 
-        final long startTime = System.currentTimeMillis();
+        final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.hadoopGraph.configuration());
+        final SparkMemory memory = new SparkMemory(Collections.emptySet());
+
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
-            return null;
-        });
+                    final long startTime = System.currentTimeMillis();
+                    // load the graph
+                    if (null != this.vertexProgram) {
+                        final SparkConf sparkConfiguration = new SparkConf();
+                        sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
+                        hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+                        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
+                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+
+                        // set up the input format
+                        final JavaSparkContext sc = new JavaSparkContext(sparkConfiguration);
+                        final JavaPairRDD<NullWritable, VertexWritable> rdd = sc.newAPIHadoopRDD(hadoopConfiguration,
+                                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                                NullWritable.class,
+                                VertexWritable.class);
+                        final JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
+                        GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
+
+                        // set up the vertex program
+                        this.vertexProgram.setup(memory);
+                        final org.apache.commons.configuration.Configuration vertexProgramConfiguration = new SerializableConfiguration();
+                        this.vertexProgram.storeState(vertexProgramConfiguration);
+
+                        // execute the vertex program
+                        while (true) {
+                            g = g.execute(vertexProgramConfiguration, memory);
+                            g.foreachPartition(iterator -> doNothing());
+                            memory.incrIteration();
+                            if (this.vertexProgram.terminate(memory))
+                                break;
+                        }
+                        // write the output graph back to disk
+                        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+                        if (null != outputLocation) {
+                            try {
+                                FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+                            } catch (final IOException e) {
+                                throw new IllegalStateException(e.getMessage(), e);
+                            }
+                            // map back to a <nullwritable,vertexwritable> stream for output
+                            g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
+                                    .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
+                                            NullWritable.class,
+                                            VertexWritable.class,
+                                            (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
+                        }
+                    }
+
+                    // execute mapreduce jobs
+                    for (final MapReduce mapReduce : this.mapReducers) {
+                        //TODO
+                       /* g.mapValues(messenger -> {
+                            mapReduce.map(messenger.vertex, null);
+                            return messenger;
+                        }).combine().reduce();*/
+                    }
+                    // update runtime and return the newly computed graph
+                    memory.setRuntime(System.currentTimeMillis() - startTime);
+                    memory.complete();
+                    return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), memory.asImmutable());
+                }
+        );
+    }
+
+    private static final void doNothing() {
+        // a cheap action
     }
 
+    /////////////////
+
+    public static void main(final String[] args) throws Exception {
+        final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-kryo.properties");
+        // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+        final HadoopGraph graph = HadoopGraph.open(configuration);
+        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
+        System.out.println(result);
+        //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
+        result.graph().V().valueMap().forEachRemaining(System.out::println);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index b18940a..cc170c4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -64,11 +64,6 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
     }
 
     @Override
-    public String toString() {
-        return "messageBox[incoming(" + this.incoming.size() + "):outgoing(" + this.outgoing.size() + ")]";
-    }
-
-    @Override
     public Iterable<M> receiveMessages(final MessageScope messageScope) {
         return this.incoming;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
index 5a81017..38f8a61 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Iterator;
@@ -38,54 +39,47 @@ import java.util.Iterator;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class SparkVertex implements Vertex, Vertex.Iterators, Serializable {
+public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable {
 
     private static KryoWriter KRYO_WRITER = KryoWriter.build().create();
     private static KryoReader KRYO_READER = KryoReader.build().create();
     private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
 
     private transient TinkerVertex vertex;
-    private byte[] serializedForm;
+    private byte[] vertexBytes;
 
     public SparkVertex(final TinkerVertex vertex) {
         this.vertex = vertex;
         this.vertex.graph().variables().set(VERTEX_ID, this.vertex.id());
-        this.deflateVertex();
     }
 
     @Override
-    public Edge addEdge(String label, Vertex inVertex, Object... keyValues) {
-        inflateVertex();
+    public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
         return this.vertex.addEdge(label, inVertex, keyValues);
     }
 
     @Override
     public Object id() {
-        inflateVertex();
         return this.vertex.id();
     }
 
     @Override
     public String label() {
-        inflateVertex();
         return this.vertex.label();
     }
 
     @Override
     public Graph graph() {
-        inflateVertex();
         return this.vertex.graph();
     }
 
     @Override
-    public <V> VertexProperty<V> property(String key, V value) {
-        inflateVertex();
+    public <V> VertexProperty<V> property(final String key, final V value) {
         return this.vertex.property(key, value);
     }
 
     @Override
     public void remove() {
-        inflateVertex();
         this.vertex.remove();
     }
 
@@ -95,51 +89,59 @@ public class SparkVertex implements Vertex, Vertex.Iterators, Serializable {
     }
 
     @Override
-    public Iterator<Edge> edgeIterator(Direction direction, String... edgeLabels) {
-        inflateVertex();
+    public Iterator<Edge> edgeIterator(final Direction direction, final String... edgeLabels) {
         return this.vertex.iterators().edgeIterator(direction, edgeLabels);
     }
 
     @Override
-    public Iterator<Vertex> vertexIterator(Direction direction, String... edgeLabels) {
-        inflateVertex();
+    public Iterator<Vertex> vertexIterator(final Direction direction, final String... edgeLabels) {
         return this.vertex.iterators().vertexIterator(direction, edgeLabels);
     }
 
     @Override
-    public <V> Iterator<VertexProperty<V>> propertyIterator(String... propertyKeys) {
-        inflateVertex();
+    public <V> Iterator<VertexProperty<V>> propertyIterator(final String... propertyKeys) {
         return this.vertex.iterators().propertyIterator(propertyKeys);
     }
 
+    ///////////////////////////////
+
     private void writeObject(final ObjectOutputStream outputStream) throws IOException {
-        this.inflateVertex();
         this.deflateVertex();
         outputStream.defaultWriteObject();
     }
 
+    private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
+        inputStream.defaultReadObject();
+        this.inflateVertex();
+    }
+
     private final void inflateVertex() {
         if (null != this.vertex)
             return;
 
         try {
-            final ByteArrayInputStream bis = new ByteArrayInputStream(this.serializedForm);
+            final ByteArrayInputStream bis = new ByteArrayInputStream(this.vertexBytes);
             final TinkerGraph tinkerGraph = TinkerGraph.open();
             KRYO_READER.readGraph(bis, tinkerGraph);
             bis.close();
+            this.vertexBytes = null;
             this.vertex = (TinkerVertex) tinkerGraph.iterators().vertexIterator(tinkerGraph.variables().get(VERTEX_ID).get()).next();
-        } catch (final Exception e) {
+        } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     private final void deflateVertex() {
+        if (null != this.vertexBytes)
+            return;
+
         try {
             final ByteArrayOutputStream bos = new ByteArrayOutputStream();
             KRYO_WRITER.writeGraph(bos, this.vertex.graph());
             bos.flush();
             bos.close();
-            this.serializedForm = bos.toByteArray();
+            this.vertex = null;
+            this.vertexBytes = bos.toByteArray();
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }