You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/03 19:05:20 UTC

incubator-tinkerpop git commit: Spark MapReduce engine built. It was more complex than I suspected. Will definately need to break up SparkGraphComputer in various XXXHelper classes with static methods... its pretty beefy right now. All that is left is Me

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/spark 3ed0fa6cf -> 70fc529be


Spark MapReduce engine built. It was more complex than I suspected. Will definately need to break up SparkGraphComputer in various XXXHelper classes with static methods... its pretty beefy right now. All that is left is Memory using Spark Aggregators (going to be painful).


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/70fc529b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/70fc529b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/70fc529b

Branch: refs/heads/spark
Commit: 70fc529be06cbe45c7800af900abc4e65dbd3a11
Parents: 3ed0fa6
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 11:05:27 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 11:05:27 2015 -0700

----------------------------------------------------------------------
 hadoop-gremlin/conf/spark-kryo.properties       |  2 +-
 .../tinkerpop/gremlin/hadoop/Constants.java     |  5 ++
 .../computer/spark/SparkGraphComputer.java      | 81 ++++++++++++++++++--
 .../process/computer/spark/SparkMapEmitter.java | 42 ++++++++++
 .../process/computer/spark/SparkMemory.java     |  6 +-
 .../computer/spark/SparkReduceEmitter.java      | 42 ++++++++++
 .../process/computer/util/MapReduceHelper.java  | 32 ++++----
 7 files changed, 180 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/conf/spark-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-kryo.properties b/hadoop-gremlin/conf/spark-kryo.properties
index ec8b393..483349f 100644
--- a/hadoop-gremlin/conf/spark-kryo.properties
+++ b/hadoop-gremlin/conf/spark-kryo.properties
@@ -22,7 +22,7 @@ gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io
 gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
 gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
 gremlin.hadoop.deriveMemory=false
-gremlin.hadoop.jarsInDistributedCache=true
+gremlin.hadoop.jarsInDistributedCache=false
 
 gremlin.hadoop.inputLocation=hdfs://localhost:9000/user/marko/tinkerpop-modern-vertices.gio
 gremlin.hadoop.outputLocation=output

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index f229b17..bf06fcc 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop;
 
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 
 /**
@@ -46,4 +47,8 @@ public class Constants {
     public static final String GREMLIN_HADOOP_MAP_REDUCE_CLASS = "gremlin.hadoop.mapReduceClass";
     public static final String GREMLIN_HADOOP_HALT = "gremlin.hadoop.halt";
     public static final String MAP_MEMORY = "gremlin.hadoop.mapMemory";
+
+    public static final String SEQUENCE_WARNING = "The " + Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT
+            + " is not " + SequenceFileOutputFormat.class.getCanonicalName()
+            + " and thus, graph computer memory can not be converted to Java objects";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 4b30e16..7cace20 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -27,11 +27,14 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
@@ -39,6 +42,7 @@ import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankMapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
@@ -51,7 +55,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
@@ -133,8 +139,8 @@ public class SparkGraphComputer implements GraphComputer {
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class);
-                        final JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
-                        GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
+                        final JavaPairRDD<Object, SparkMessenger<Object>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
+                        GraphComputerRDD<Object> g = GraphComputerRDD.of(rdd2);
 
                         // set up the vertex program
                         this.vertexProgram.setup(memory);
@@ -164,16 +170,73 @@ public class SparkGraphComputer implements GraphComputer {
                                             VertexWritable.class,
                                             (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
                         }
+                        sparkContext.close();
                     }
 
                     // execute mapreduce jobs
                     for (final MapReduce mapReduce : this.mapReducers) {
-                        //TODO
-                       /* g.mapValues(messenger -> {
-                            mapReduce.map(messenger.vertex, null);
-                            return messenger;
-                        }).combine().reduce();*/
+                        // set up the map reduce job
+                        final org.apache.commons.configuration.Configuration mapReduceConfiguration = new SerializableConfiguration();
+                        mapReduce.storeState(mapReduceConfiguration);
+
+                        // set up spark job
+                        final SparkConf sparkConfiguration = new SparkConf();
+                        sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
+                        hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+                        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
+                            hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
+                        // set up the input format
+                        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
+                        SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+                        final JavaPairRDD<NullWritable, VertexWritable> g = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+                                (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+                                NullWritable.class,
+                                VertexWritable.class);
+
+                        // map
+                        JavaPairRDD<?, ?> mapRDD = g.flatMapToPair(tuple -> {
+                            final MapReduce m = MapReduce.createMapReduce(mapReduceConfiguration);
+                            final SparkMapEmitter mapEmitter = new SparkMapEmitter();
+                            m.map(tuple._2().get(), mapEmitter);
+                            return mapEmitter.getEmissions();
+                        });
+                        if (mapReduce.getMapKeySort().isPresent())
+                            mapRDD = mapRDD.sortByKey((Comparator) mapReduce.getMapKeySort().get());
+                        // todo: combine
+                        // reduce
+                        JavaPairRDD<?, ?> reduceRDD = null;
+                        if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
+                            reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
+                                final MapReduce m = MapReduce.createMapReduce(mapReduceConfiguration);
+                                final SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
+                                m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
+                                return reduceEmitter.getEmissions();
+                            });
+                            if (mapReduce.getReduceKeySort().isPresent())
+                                reduceRDD = reduceRDD.sortByKey((Comparator) mapReduce.getReduceKeySort().get());
+                        }
+                        // write the output graph back to disk
+                        final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+                        if (null != outputLocation) {
+                            // map back to a <nullwritable,vertexwritable> stream for output
+                            ((null == reduceRDD) ? mapRDD : reduceRDD).mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+                                    ObjectWritable.class,
+                                    ObjectWritable.class,
+                                    (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
+                            // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+                            // to get results you have to look through HDFS directory structure. Oh the horror.
+                            try {
+                                if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+                                    mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+                                else
+                                    HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+                            } catch (final IOException e) {
+                                throw new IllegalStateException(e.getMessage(), e);
+                            }
+                        }
+                        sparkContext.close();
                     }
+
                     // update runtime and return the newly computed graph
                     memory.setRuntime(System.currentTimeMillis() - startTime);
                     memory.complete();
@@ -210,8 +273,10 @@ public class SparkGraphComputer implements GraphComputer {
         final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-kryo.properties");
         // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
         final HadoopGraph graph = HadoopGraph.open(configuration);
-        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
+        final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).mapReduce(PageRankMapReduce.build().create()).submit().get();
+        // TODO: remove everything below
         System.out.println(result);
+        result.memory().<Iterator>get(PageRankMapReduce.DEFAULT_MEMORY_KEY).forEachRemaining(System.out::println);
         //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
         result.graph().V().valueMap().forEachRemaining(System.out::println);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
new file mode 100644
index 0000000..3a4a424
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+
+    private final List<Tuple2<K, V>> emissions = new ArrayList<>();
+
+    @Override
+    public void emit(final K key, final V value) {
+        emissions.add(new Tuple2<>(key, value));
+    }
+
+    public Iterable<Tuple2<K, V>> getEmissions() {
+        return this.emissions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index 88b046e..eb2af7f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -145,8 +145,8 @@ public final class SparkMemory implements Memory.Admin, Serializable {
     }
 
     private void checkKeyValue(final String key, final Object value) {
-        if (!this.memoryKeys.contains(key))
-            throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
-        MemoryHelper.validateValue(value);
+        //if (!this.memoryKeys.contains(key))
+        //    throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+        //MemoryHelper.validateValue(value);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
new file mode 100644
index 0000000..b9f056c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+
+    private final List<Tuple2<OK, OV>> emissions = new ArrayList<>();
+
+    @Override
+    public void emit(final OK key, final OV value) {
+        this.emissions.add(new Tuple2<>(key, value));
+    }
+
+    public List<Tuple2<OK, OV>> getEmissions() {
+        return this.emissions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/70fc529b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index cd49a91..89a1abf 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -18,18 +18,6 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.util;
 
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopCombine;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopMap;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopReduce;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableComparator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +31,18 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopCombine;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopMap;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.HadoopReduce;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableComparator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 
 import java.io.IOException;
 import java.util.Comparator;
@@ -56,10 +56,6 @@ public final class MapReduceHelper {
     private MapReduceHelper() {
     }
 
-    private static final String SEQUENCE_WARNING = "The " + Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT
-            + " is not " + SequenceFileOutputFormat.class.getCanonicalName()
-            + " and thus, graph computer memory can not be converted to Java objects";
-
     public static void executeMapReduceJob(final MapReduce mapReduce, final Memory.Admin memory, final Configuration configuration) throws IOException, ClassNotFoundException, InterruptedException {
         final Configuration newConfiguration = new Configuration(configuration);
         final BaseConfiguration apacheConfiguration = new BaseConfiguration();
@@ -70,7 +66,7 @@ public final class MapReduceHelper {
             if (newConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
                 mapReduce.addResultToMemory(memory, new ObjectWritableIterator(configuration, memoryPath));
             else
-                HadoopGraph.LOGGER.warn(SEQUENCE_WARNING);
+                HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
         } else {
             final Optional<Comparator<?>> mapSort = mapReduce.getMapKeySort();
             final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
@@ -137,7 +133,7 @@ public final class MapReduceHelper {
             if (newConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
                 mapReduce.addResultToMemory(memory, new ObjectWritableIterator(configuration, memoryPath));
             else
-                HadoopGraph.LOGGER.warn(SEQUENCE_WARNING);
+                HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
         }
     }
 }