You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/03 23:31:59 UTC
incubator-tinkerpop git commit: lots of clean up and organization.
SparkGraphComputer is now really clean with all the dirty work being done by
SparkHelper.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/spark 8246ee6d5 -> b6133ae75
lots of clean up and organization. SparkGraphComputer is now really clean with all the dirty work being done by SparkHelper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b6133ae7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b6133ae7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b6133ae7
Branch: refs/heads/spark
Commit: b6133ae75e4f8ebb29f0da042c37e9ce09d92ca4
Parents: 8246ee6
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 15:32:05 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 15:32:05 2015 -0700
----------------------------------------------------------------------
.../computer/giraph/GiraphGraphComputer.java | 10 +-
.../computer/spark/GraphComputerRDD.java | 106 -----------
.../process/computer/spark/RuleAccumulator.java | 2 +-
.../computer/spark/SparkGraphComputer.java | 158 ++++++-----------
.../process/computer/spark/SparkMapEmitter.java | 2 +-
.../process/computer/spark/SparkMemory.java | 1 -
.../computer/spark/SparkMemoryAccumulator.java | 2 +-
.../process/computer/spark/SparkMessenger.java | 31 +++-
.../computer/spark/SparkReduceEmitter.java | 2 +-
.../process/computer/spark/ToyVertex.java | 114 ------------
.../computer/spark/util/SparkHelper.java | 177 +++++++++++++++++++
.../hadoop/structure/HadoopConfiguration.java | 5 +-
.../gremlin/hadoop/structure/HadoopGraph.java | 2 +-
13 files changed, 267 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 589c22c..56d029c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -221,14 +221,8 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
}
public static void main(final String[] args) throws Exception {
- try {
- final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
- final GiraphGraphComputer computer = new GiraphGraphComputer(HadoopGraph.open(configuration));
- computer.program(VertexProgram.createVertexProgram(configuration)).submit().get();
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- }
+ final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+ new GiraphGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(configuration)).submit().get();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
deleted file mode 100644
index 786e5af..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.function.FlatMapFunction2;
-import org.apache.spark.rdd.RDD;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-import scala.reflect.ManifestFactory;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>> {
-
- public GraphComputerRDD(final RDD<Tuple2<Object, SparkMessenger<M>>> rdd) {
- super(rdd, ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
- }
-
- public GraphComputerRDD(final JavaPairRDD<Object, SparkMessenger<M>> rdd) {
- super(rdd.rdd(), ManifestFactory.classType(Object.class), ManifestFactory.classType(SparkMessenger.class));
- }
-
- public GraphComputerRDD execute(final Configuration configuration, final SparkMemory memory) {
- JavaPairRDD<Object, SparkMessenger<M>> current = this;
- // execute vertex program
- current = current.mapPartitionsToPair(iterator -> {
- final VertexProgram<M> vertexProgram = VertexProgram.createVertexProgram(configuration);
- return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
- vertexProgram.execute(tuple._2().vertex, tuple._2(), memory);
- return tuple;
- });
- });
- // clear all previous incoming messages
- if (!memory.isInitialIteration()) {
- current = current.mapValues(messenger -> {
- messenger.clearIncomingMessages();
- return messenger;
- });
- }
- // emit messages
- current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
- final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().outgoing.entrySet()
- .stream()
- .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new ToyVertex(entry.getKey()), entry.getValue())))
- .collect(Collectors.toList()); // the message vertices
- list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
- return list;
- });
- // "message pass" via reduction
- current = current.reduceByKey((a, b) -> {
- if (a.vertex instanceof ToyVertex && !(b.vertex instanceof ToyVertex))
- a.vertex = b.vertex;
- a.incoming.addAll(b.incoming);
- return a;
- });
- // clear all previous outgoing messages
- current = current.mapValues(messenger -> {
- messenger.clearOutgoingMessages();
- return messenger;
- });
- return GraphComputerRDD.of(current);
- }
-
- public static <M> GraphComputerRDD<M> of(final JavaPairRDD<Object, SparkMessenger<M>> javaPairRDD) {
- return new GraphComputerRDD<>(javaPairRDD);
- }
-
- public static <M> GraphComputerRDD<M> of(final JavaRDD<Tuple2<Object, SparkMessenger<M>>> javaRDD) {
- return new GraphComputerRDD<>(javaRDD.rdd());
- }
-
- //////////////
-
- // TODO: What the hell is this for?
- @Override
- public JavaRDD zipPartitions(JavaRDDLike uJavaRDDLike, FlatMapFunction2 iteratorIteratorVFlatMapFunction2) {
- return (JavaRDD) new JavaRDD<>(null, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
index 59da2f4..446dbdb 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
@@ -24,7 +24,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class RuleAccumulator implements AccumulatorParam<Rule> {
+public final class RuleAccumulator implements AccumulatorParam<Rule> {
@Override
public Rule addAccumulator(final Rule a, final Rule b) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 946d2af..dd004bc 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -22,20 +22,15 @@ import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.FileConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
@@ -54,9 +49,6 @@ import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -66,12 +58,11 @@ import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SparkGraphComputer implements GraphComputer {
+public final class SparkGraphComputer implements GraphComputer {
public static final Logger LOGGER = LoggerFactory.getLogger(SparkGraphComputer.class);
protected final SparkConf configuration = new SparkConf();
-
protected final HadoopGraph hadoopGraph;
private boolean executed = false;
private final Set<MapReduce> mapReducers = new HashSet<>();
@@ -116,137 +107,99 @@ public class SparkGraphComputer implements GraphComputer {
if (null == this.vertexProgram && this.mapReducers.isEmpty())
throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
// it is possible to run mapreducers without a vertex program
- if (null != this.vertexProgram)
+ if (null != this.vertexProgram) {
GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
-
+ this.mapReducers.addAll(this.vertexProgram.getMapReducers());
+ }
+ // apache and hadoop configurations that are used throughout
final org.apache.commons.configuration.Configuration apacheConfiguration = this.hadoopGraph.configuration();
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.hadoopGraph.configuration());
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
final long startTime = System.currentTimeMillis();
SparkMemory memory = null;
- // load the graph
+ SparkHelper.deleteOutputDirectory(hadoopConfiguration);
+ ////////////////////////////////
+ // process the vertex program //
+ ////////////////////////////////
if (null != this.vertexProgram) {
+ // set up the spark job
final SparkConf sparkConfiguration = new SparkConf();
sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
- hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
-
- // set up the input format
+ hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
///
try {
- final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+ // create a message-passing friendly rdd from the hadoop input format
+ JavaPairRDD<Object, SparkMessenger<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
- VertexWritable.class);
- final JavaPairRDD<Object, SparkMessenger<Object>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
- GraphComputerRDD<Object> g = GraphComputerRDD.of(rdd2);
+ VertexWritable.class)
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
- // set up the vertex program
+ // set up the vertex program and wire up configurations
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
this.vertexProgram.setup(memory);
final SerializableConfiguration vertexProgramConfiguration = new SerializableConfiguration();
this.vertexProgram.storeState(vertexProgramConfiguration);
- this.mapReducers.addAll(this.vertexProgram.getMapReducers());
ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+
// execute the vertex program
- while (true) {
- g = g.execute(vertexProgramConfiguration, memory);
- g.foreachPartition(iterator -> doNothing());
+ do {
+ graphRDD = SparkHelper.executeStep(graphRDD, this.vertexProgram, memory, vertexProgramConfiguration);
+ graphRDD.foreachPartition(iterator -> doNothing()); // i think this is a fast way to execute the rdd
+ graphRDD.cache(); // TODO: learn about persistence and caching
memory.incrIteration();
- if (this.vertexProgram.terminate(memory))
- break;
- }
+ } while (!this.vertexProgram.terminate(memory));
+
// write the output graph back to disk
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
- if (null != outputLocation) {
- try {
- FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- // map back to a <nullwritable,vertexwritable> stream for output
- g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
- .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
- NullWritable.class,
- VertexWritable.class,
- (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
- }
+ SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
} finally {
+ // must close the context or bad things happen
sparkContext.close();
}
+ sparkContext.close(); // why not try again todo
}
+ //////////////////////////////
+ // process the map reducers //
+ //////////////////////////////
final Memory.Admin finalMemory = null == memory ? new DefaultMemory() : new DefaultMemory(memory);
- // execute mapreduce jobs
for (final MapReduce mapReduce : this.mapReducers) {
- // set up the map reduce job
- final SerializableConfiguration newConfiguration = new SerializableConfiguration(apacheConfiguration);
- mapReduce.storeState(newConfiguration);
-
- // set up spark job
+ // set up the spark job
final SparkConf sparkConfiguration = new SparkConf();
sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + mapReduce);
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
- // set up the input format
final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+ // execute the map reduce job
try {
- final JavaPairRDD<NullWritable, VertexWritable> g = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+ final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class);
+ final SerializableConfiguration newApacheConfiguration = new SerializableConfiguration(apacheConfiguration);
+ mapReduce.storeState(newApacheConfiguration);
// map
- JavaPairRDD<?, ?> mapRDD = g.flatMapToPair(tuple -> {
- final MapReduce m = MapReduce.createMapReduce(newConfiguration);
- final SparkMapEmitter mapEmitter = new SparkMapEmitter();
- m.map(tuple._2().get(), mapEmitter);
- return mapEmitter.getEmissions();
- });
- if (mapReduce.getMapKeySort().isPresent())
- mapRDD = mapRDD.sortByKey((Comparator) mapReduce.getMapKeySort().get());
- // todo: combine
+ final JavaPairRDD mapRDD = SparkHelper.executeMap(hadoopGraphRDD, mapReduce, newApacheConfiguration);
+ // combine todo
// reduce
- JavaPairRDD<?, ?> reduceRDD = null;
- if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
- reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
- final MapReduce m = MapReduce.createMapReduce(newConfiguration);
- final SparkReduceEmitter reduceEmitter = new SparkReduceEmitter();
- m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
- return reduceEmitter.getEmissions();
- });
- if (mapReduce.getReduceKeySort().isPresent())
- reduceRDD = reduceRDD.sortByKey((Comparator) mapReduce.getReduceKeySort().get());
- }
- // write the output graph back to disk
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
- if (null != outputLocation) {
- // map back to a Hadoop stream for output
- ((null == reduceRDD) ? mapRDD : reduceRDD).mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
- ObjectWritable.class,
- ObjectWritable.class,
- (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
- // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
- // to get results you have to look through HDFS directory structure. Oh the horror.
- try {
- if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
- mapReduce.addResultToMemory(finalMemory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
- else
- HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+
+ // write the map reduce output back to disk (memory)
+ SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
} finally {
+ // must close the context or bad things happen
sparkContext.close();
}
+ sparkContext.close(); // why not try again todo
}
// update runtime and return the newly computed graph
@@ -256,6 +209,8 @@ public class SparkGraphComputer implements GraphComputer {
);
}
+ /////////////////
+
private static final void doNothing() {
// a cheap action
}
@@ -278,19 +233,18 @@ public class SparkGraphComputer implements GraphComputer {
}
}
- /////////////////
-
public static void main(final String[] args) throws Exception {
- final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-gryo.properties");
- // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
- final HadoopGraph graph = HadoopGraph.open(configuration);
- final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
- // TODO: remove everything below
- System.out.println(result);
- //result.memory().<Iterator>get(PageRankMapReduce.DEFAULT_MEMORY_KEY).forEachRemaining(System.out::println);
- //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
- result.graph().V().valueMap().forEachRemaining(System.out::println);
+ final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+ new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(configuration)).submit().get();
}
-
+ @Override
+ public Features features() {
+ return new Features() {
+ @Override
+ public boolean supportsNonSerializableObjects() {
+ return true; // TODO
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
index 0f5acc1..6cd8885 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -27,7 +27,7 @@ import java.util.List;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
private final List<Tuple2<K, V>> emissions = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
index b277e83..90bc73a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
@@ -60,7 +60,6 @@ public final class SparkMemory implements Memory.Admin, Serializable {
for (final String key : this.memoryKeys) {
this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), new RuleAccumulator()));
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
index 470774a..10b9525 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemoryAccumulator.java
@@ -23,7 +23,7 @@ import org.apache.spark.AccumulatorParam;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SparkMemoryAccumulator implements AccumulatorParam<SparkMemory> {
+public final class SparkMemoryAccumulator implements AccumulatorParam<SparkMemory> {
@Override
public SparkMemory addAccumulator(final SparkMemory first, final SparkMemory second) {
return first;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index cc170c4..812bdd3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -28,28 +28,31 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class SparkMessenger<M> implements Serializable, Messenger<M> {
- protected Vertex vertex;
- protected List<M> incoming;
- protected Map<Object, List<M>> outgoing = new HashMap<>();
+ private Vertex vertex;
+ private List<M> incoming;
+ private Map<Object, List<M>> outgoing = new HashMap<>();
public SparkMessenger() {
}
+ public SparkMessenger(final Vertex vertex) {
+ this.vertex = vertex;
+ this.incoming = new ArrayList<>();
+ }
+
public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
this.vertex = vertex;
this.incoming = incomingMessages;
@@ -63,6 +66,22 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
this.outgoing.clear();
}
+ public Vertex getVertex() {
+ return this.vertex;
+ }
+
+ public void setVertex(final Vertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public void addIncomingMessages(final SparkMessenger<M> otherMessenger) {
+ this.incoming.addAll(otherMessenger.incoming);
+ }
+
+ public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
+ return this.outgoing.entrySet();
+ }
+
@Override
public Iterable<M> receiveMessages(final MessageScope messageScope) {
return this.incoming;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
index b9f056c..77e7072 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
@@ -27,7 +27,7 @@ import java.util.List;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
private final List<Tuple2<OK, OV>> emissions = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
deleted file mode 100644
index 121ae2d..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/ToyVertex.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
-* @author Marko A. Rodriguez (http://markorodriguez.com)
-*/
-public final class ToyVertex implements Vertex, Vertex.Iterators, Serializable {
-
- private final Object id;
- private static final String TOY_VERTEX = "toyVertex";
-
- public ToyVertex(final Object id) {
- this.id = id;
- }
-
- ToyVertex() {
- this.id = null;
- }
-
- @Override
- public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object id() {
- return this.id;
- }
-
- @Override
- public String label() {
- return TOY_VERTEX;
- }
-
- @Override
- public Graph graph() {
- return EmptyGraph.instance();
- }
-
- @Override
- public <V> VertexProperty<V> property(final String key, final V value) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterators iterators() {
- return this;
- }
-
- @Override
- public Iterator<Edge> edgeIterator(Direction direction, String... edgeLabels) {
- return Collections.emptyIterator();
- }
-
- @Override
- public Iterator<Vertex> vertexIterator(Direction direction, String... edgeLabels) {
- return Collections.emptyIterator();
- }
-
- @Override
- public <V> Iterator<VertexProperty<V>> propertyIterator(String... propertyKeys) {
- return Collections.emptyIterator();
- }
-
- @Override
- public int hashCode() {
- return ElementHelper.hashCode(this);
- }
-
- @Override
- public boolean equals(final Object other) {
- return ElementHelper.areEqual(this, other);
- }
-
- @Override
- public String toString() {
- return StringFactory.vertexString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
new file mode 100644
index 0000000..ece9d7c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessenger;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkHelper {
+
+ private SparkHelper() {
+ }
+
+ public static <M> JavaPairRDD<Object, SparkMessenger<M>> executeStep(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final VertexProgram<M> globalVertexProgram, final SparkMemory memory, final Configuration apacheConfiguration) {
+ JavaPairRDD<Object, SparkMessenger<M>> current = graphRDD;
+ // execute vertex program
+ current = current.mapPartitionsToPair(iterator -> { // each partition has a copy of the vertex program
+ final VertexProgram<M> vertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+ return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(iterator, tuple -> {
+ vertexProgram.execute(tuple._2().getVertex(), tuple._2(), memory);
+ return tuple;
+ });
+ });
+ // clear all previous incoming messages
+ if (!memory.isInitialIteration()) {
+ current = current.mapValues(messenger -> {
+ messenger.clearIncomingMessages();
+ return messenger;
+ });
+ }
+ // emit messages
+ current = current.<Object, SparkMessenger<M>>flatMapToPair(tuple -> {
+ final List<Tuple2<Object, SparkMessenger<M>>> list = tuple._2().getOutgoingMessages()
+ .stream()
+ .map(entry -> new Tuple2<>(entry.getKey(), new SparkMessenger<>(new DetachedVertex(entry.getKey(), Vertex.DEFAULT_LABEL, Collections.emptyMap()), entry.getValue()))) // maybe go back to toy vertex if label is expensive
+ .collect(Collectors.toList()); // the message vertices
+ list.add(new Tuple2<>(tuple._1(), tuple._2())); // the raw vertex
+ return list;
+ });
+
+ // TODO: local message combiner
+ if (globalVertexProgram.getMessageCombiner().isPresent()) {
+ /* current = current.combineByKey(messenger -> {
+ return messenger;
+ });*/
+ }
+
+ // "message pass" via reduction
+ current = current.reduceByKey((a, b) -> {
+ if (a.getVertex() instanceof DetachedVertex && !(b.getVertex() instanceof DetachedVertex))
+ a.setVertex(b.getVertex());
+ a.addIncomingMessages(b);
+ return a;
+ });
+
+ // clear all previous outgoing messages
+ current = current.mapValues(messenger -> {
+ messenger.clearOutgoingMessages();
+ return messenger;
+ });
+ return current;
+ }
+
+ public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<K, V> mapRDD = hadoopGraphRDD.flatMapToPair(tuple -> {
+ final MapReduce<K, V, ?, ?, ?> m = MapReduce.createMapReduce(apacheConfiguration); // todo create only for each partition
+ final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+ m.map(tuple._2().get(), mapEmitter);
+ return mapEmitter.getEmissions();
+ });
+ if (mapReduce.getMapKeySort().isPresent())
+ mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+ return mapRDD;
+ }
+
+ // TODO: public static executeCombine()
+
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().flatMapToPair(tuple -> {
+ final MapReduce<K, V, OK, OV, ?> m = MapReduce.createMapReduce(apacheConfiguration); // todo create only for each partition
+ final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+ m.reduce(tuple._1(), tuple._2().iterator(), reduceEmitter);
+ return reduceEmitter.getEmissions();
+ });
+ if (mapReduce.getReduceKeySort().isPresent())
+ reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+ return reduceRDD;
+ }
+
+ public static void deleteOutputDirectory(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ try {
+ FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public static <M> void saveVertexProgramRDD(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ // map back to a <nullwritable,vertexwritable> stream for output
+ graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().getVertex())))
+ .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
+ NullWritable.class,
+ VertexWritable.class,
+ (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
+ }
+ }
+
+ public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ // map back to a Hadoop stream for output
+ mapReduceRDD.mapToPair(tuple -> new Tuple2<>(new ObjectWritable<>(tuple._1()), new ObjectWritable<>(tuple._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+ ObjectWritable.class,
+ ObjectWritable.class,
+ (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class));
+ // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+ // to get results you have to look through HDFS directory structure. Oh the horror.
+ try {
+ if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+ mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+ else
+ HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 76636cd..0fb2e9f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.util.StreamFactory;
@@ -86,9 +85,9 @@ public class HadoopConfiguration extends BaseConfiguration implements Serializab
this.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
}
- public Class<? extends GraphComputer> getGraphComputer() {
+ public Class<? extends GraphComputer> getGraphComputer(final Class<? extends GraphComputer> defaultGraphComputer) {
if (!this.containsKey(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER))
- return GiraphGraphComputer.class;
+ return defaultGraphComputer;
else {
try {
return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_DEFAULT_GRAPH_COMPUTER));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b6133ae7/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 8154888..990fc77 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -146,7 +146,7 @@ public class HadoopGraph implements Graph, Graph.Iterators {
private HadoopGraph(final Configuration configuration) {
this.configuration = new HadoopConfiguration(configuration);
- this.graphComputerClass = this.configuration.getGraphComputer();
+ this.graphComputerClass = this.configuration.getGraphComputer(GiraphGraphComputer.class);
}
public static HadoopGraph open() {