You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:03 UTC
[08/20] incubator-tinkerpop git commit: SparkGraphComputer is prim
and proper. No longer using public static void main(). The GraphComputer API
is legitamately implemented. All that is left --- MapReduce engine (easy) and
GraphComputer Memory (hard).
SparkGraphComputer is prim and proper. No longer using public static void main(). The GraphComputer API is legitamately implemented. All that is left --- MapReduce engine (easy) and GraphComputer Memory (hard).
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/051994ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/051994ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/051994ae
Branch: refs/heads/master
Commit: 051994aeac31de6e02213f1de5eb258f798602eb
Parents: c98d5be
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 09:28:28 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 09:28:28 2015 -0700
----------------------------------------------------------------------
hadoop-gremlin/conf/giraph-graphson.properties | 57 ++++++++
hadoop-gremlin/conf/giraph-kryo.properties | 31 +++++
hadoop-gremlin/conf/hadoop-graphson.properties | 57 --------
hadoop-gremlin/conf/hadoop-kryo.properties | 31 -----
hadoop-gremlin/conf/spark-kryo.properties | 38 ++++++
.../computer/giraph/GiraphGraphComputer.java | 3 +-
.../computer/spark/GraphComputerRDD.java | 3 +-
.../hadoop/process/computer/spark/RDDTools.java | 46 -------
.../spark/SerializableConfiguration.java | 2 +-
.../computer/spark/SparkGraphComputer.java | 135 +++++++++++++------
.../process/computer/spark/SparkMessenger.java | 5 -
.../process/computer/spark/SparkVertex.java | 44 +++---
12 files changed, 248 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/giraph-graphson.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/giraph-graphson.properties b/hadoop-gremlin/conf/giraph-graphson.properties
new file mode 100644
index 0000000..090b0ce
--- /dev/null
+++ b/hadoop-gremlin/conf/giraph-graphson.properties
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# the graph class
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+# i/o formats for graphs and memory (i.e. computer result)
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
+# i/o locations
+gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.ldjson
+gremlin.hadoop.outputLocation=output
+# deriving a complete view of the memory requires an extra mapreduce job and thus, if not needed, should be avoided
+gremlin.hadoop.deriveMemory=false
+# if the job jars are not on the classpath of every hadoop node, then they must be provided to the distributed cache at runtime
+gremlin.hadoop.jarsInDistributedCache=true
+# the vertex program to execute
+gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
+
+# It is possible to provide Giraph configuration parameters for use with GiraphGraphComputer
+############################################################################################
+giraph.minWorkers=2
+giraph.maxWorkers=2
+# giraph.useInputSplitLocality=false
+# giraph.logLevel=debug
+
+# It is possible to provide Hadoop configuration parameters.
+# Note that these parameters are provided to each MapReduce job within the entire Hadoop-Gremlin job pipeline.
+# Some of these parameters may be over written by Hadoop-Gremlin as deemed necessary.
+##############################################################################################################
+# mapred.linerecordreader.maxlength=5242880
+# mapred.map.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
+# mapred.reduce.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
+# mapred.map.tasks=6
+# mapred.reduce.tasks=3
+# mapred.job.reuse.jvm.num.tasks=-1
+# mapred.task.timeout=5400000
+# mapred.reduce.parallel.copies=50
+# io.sort.factor=100
+# io.sort.mb=200
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/giraph-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/giraph-kryo.properties b/hadoop-gremlin/conf/giraph-kryo.properties
new file mode 100644
index 0000000..d546da7
--- /dev/null
+++ b/hadoop-gremlin/conf/giraph-kryo.properties
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+gremlin.hadoop.deriveMemory=false
+gremlin.hadoop.jarsInDistributedCache=true
+
+gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.gio
+gremlin.hadoop.outputLocation=output
+#gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram
+#gremlin.traversalVertexProgram.traversalSupplier.type=CLASS
+#gremlin.traversalVertexProgram.traversalSupplier.object=org.apache.tinkerpop.gremlin.hadoop.process.computer.example.TraversalSupplier1
+
+giraph.minWorkers=2
+giraph.maxWorkers=2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/hadoop-graphson.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-graphson.properties b/hadoop-gremlin/conf/hadoop-graphson.properties
deleted file mode 100644
index 090b0ce..0000000
--- a/hadoop-gremlin/conf/hadoop-graphson.properties
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# the graph class
-gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
-# i/o formats for graphs and memory (i.e. computer result)
-gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
-# i/o locations
-gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.ldjson
-gremlin.hadoop.outputLocation=output
-# deriving a complete view of the memory requires an extra mapreduce job and thus, if not needed, should be avoided
-gremlin.hadoop.deriveMemory=false
-# if the job jars are not on the classpath of every hadoop node, then they must be provided to the distributed cache at runtime
-gremlin.hadoop.jarsInDistributedCache=true
-# the vertex program to execute
-gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
-
-# It is possible to provide Giraph configuration parameters for use with GiraphGraphComputer
-############################################################################################
-giraph.minWorkers=2
-giraph.maxWorkers=2
-# giraph.useInputSplitLocality=false
-# giraph.logLevel=debug
-
-# It is possible to provide Hadoop configuration parameters.
-# Note that these parameters are provided to each MapReduce job within the entire Hadoop-Gremlin job pipeline.
-# Some of these parameters may be over written by Hadoop-Gremlin as deemed necessary.
-##############################################################################################################
-# mapred.linerecordreader.maxlength=5242880
-# mapred.map.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
-# mapred.reduce.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
-# mapred.map.tasks=6
-# mapred.reduce.tasks=3
-# mapred.job.reuse.jvm.num.tasks=-1
-# mapred.task.timeout=5400000
-# mapred.reduce.parallel.copies=50
-# io.sort.factor=100
-# io.sort.mb=200
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/hadoop-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-kryo.properties b/hadoop-gremlin/conf/hadoop-kryo.properties
deleted file mode 100644
index d546da7..0000000
--- a/hadoop-gremlin/conf/hadoop-kryo.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
-gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
-gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.deriveMemory=false
-gremlin.hadoop.jarsInDistributedCache=true
-
-gremlin.hadoop.inputLocation=tinkerpop-modern-vertices.gio
-gremlin.hadoop.outputLocation=output
-#gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram
-#gremlin.traversalVertexProgram.traversalSupplier.type=CLASS
-#gremlin.traversalVertexProgram.traversalSupplier.object=org.apache.tinkerpop.gremlin.hadoop.process.computer.example.TraversalSupplier1
-
-giraph.minWorkers=2
-giraph.maxWorkers=2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/conf/spark-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-kryo.properties b/hadoop-gremlin/conf/spark-kryo.properties
new file mode 100644
index 0000000..de4df3b
--- /dev/null
+++ b/hadoop-gremlin/conf/spark-kryo.properties
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
+gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat
+gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoOutputFormat
+gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+gremlin.hadoop.deriveMemory=false
+gremlin.hadoop.jarsInDistributedCache=true
+
+gremlin.hadoop.inputLocation=hdfs://localhost:9000/user/marko/tinkerpop-modern-vertices.gio
+gremlin.hadoop.outputLocation=output
+
+# the vertex program to execute
+gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram
+
+# It is possible to provide Spark configuration parameters for use with SparkGraphComputer
+##########################################################################################
+spark.master=local[4]
+spark.executor.memory=1024m
+spark.eventLog.enabled=true
+#spark.serializer=org.apache.spark.serializer.KryoSerializer
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index e382699..7a5e362 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -222,8 +222,7 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
public static void main(final String[] args) throws Exception {
try {
- final FileConfiguration configuration = new PropertiesConfiguration();
- configuration.load(new File(args[0]));
+ final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
final GiraphGraphComputer computer = new GiraphGraphComputer(HadoopGraph.open(configuration));
computer.program(VertexProgram.createVertexProgram(configuration)).submit().get();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
index abf0ac6..786e5af 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/GraphComputerRDD.java
@@ -57,7 +57,7 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
});
});
// clear all previous incoming messages
- if(!memory.isInitialIteration()) {
+ if (!memory.isInitialIteration()) {
current = current.mapValues(messenger -> {
messenger.clearIncomingMessages();
return messenger;
@@ -97,6 +97,7 @@ public class GraphComputerRDD<M> extends JavaPairRDD<Object, SparkMessenger<M>>
//////////////
+ // TODO: What the hell is this for?
@Override
public JavaRDD zipPartitions(JavaRDDLike uJavaRDDLike, FlatMapFunction2 iteratorIteratorVFlatMapFunction2) {
return (JavaRDD) new JavaRDD<>(null, null);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java
deleted file mode 100644
index cef6040..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RDDTools.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class RDDTools {
-
- public static <M> void sendMessage(final Tuple2<Vertex, List<M>> tuple, final M message) {
- tuple._2().add(message);
- }
-
- public static <M> Iterable<M> receiveMessages(final Tuple2<Vertex, List<M>> tuple) {
- return tuple._2();
- }
-
- public static <M> JavaPairRDD<Vertex, List<M>> endIteration(final JavaPairRDD<Vertex, List<M>> graph) {
- return graph.flatMapToPair(tuple -> tuple._2().stream().map(message -> new Tuple2<>(tuple._1(), Arrays.asList(message))).collect(Collectors.toList()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
index a71b456..b4a8005 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SerializableConfiguration.java
@@ -28,7 +28,7 @@ import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SerializableConfiguration extends AbstractConfiguration implements Serializable {
+public final class SerializableConfiguration extends AbstractConfiguration implements Serializable {
private final Map<String, Object> configurations = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 107f1bc..774c3c7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -18,20 +18,28 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.kryo.KryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
@@ -57,46 +65,14 @@ public class SparkGraphComputer implements GraphComputer {
protected final SparkConf configuration = new SparkConf();
protected final HadoopGraph hadoopGraph;
-
private boolean executed = false;
- private final Set<MapReduce> mapReduces = new HashSet<>();
+ private final Set<MapReduce> mapReducers = new HashSet<>();
private VertexProgram vertexProgram;
public SparkGraphComputer(final HadoopGraph hadoopGraph) {
this.hadoopGraph = hadoopGraph;
}
- public static void main(final String[] args) throws IOException {
- final SparkConf configuration = new SparkConf();
- configuration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX);
- configuration.setMaster("local");
- final JavaSparkContext sc = new JavaSparkContext(configuration);
- //JavaRDD<String> rdd = sc.textFile("hdfs://localhost:9000/user/marko/religious-traversals.txt");
- final Configuration conf = new Configuration();
- conf.set("mapred.input.dir", "hdfs://localhost:9000/user/marko/grateful-dead-vertices.gio");
- JavaPairRDD<NullWritable, VertexWritable> rdd = sc.newAPIHadoopRDD(conf, KryoInputFormat.class, NullWritable.class, VertexWritable.class);
- JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
-
- GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
-
- final org.apache.commons.configuration.Configuration vertexProgram = new SerializableConfiguration();
- final PageRankVertexProgram pageRankVertexProgram = PageRankVertexProgram.build().create();
- pageRankVertexProgram.storeState(vertexProgram);
- final SparkMemory memory = new SparkMemory(Collections.emptySet());
-
- while (!pageRankVertexProgram.terminate(memory)) {
- g = g.execute(vertexProgram, memory);
- g.foreachPartition(iterator -> doNothing());
- memory.incrIteration();
- }
- g.foreach(t -> System.out.println(t._2().vertex.property(PageRankVertexProgram.PAGE_RANK) + "-->" + t._2().vertex.value("name")));
- System.out.println(g.count());
- }
-
- private static final void doNothing() {
- }
-
-
@Override
public GraphComputer isolation(final Isolation isolation) {
if (!isolation.equals(Isolation.BSP))
@@ -112,7 +88,7 @@ public class SparkGraphComputer implements GraphComputer {
@Override
public GraphComputer mapReduce(final MapReduce mapReduce) {
- this.mapReduces.add(mapReduce);
+ this.mapReducers.add(mapReduce);
return this;
}
@@ -129,16 +105,95 @@ public class SparkGraphComputer implements GraphComputer {
this.executed = true;
// it is not possible execute a computer if it has no vertex program nor mapreducers
- if (null == this.vertexProgram && this.mapReduces.isEmpty())
+ if (null == this.vertexProgram && this.mapReducers.isEmpty())
throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
// it is possible to run mapreducers without a vertex program
if (null != this.vertexProgram)
GraphComputerHelper.validateProgramOnComputer(this, vertexProgram);
- final long startTime = System.currentTimeMillis();
+ final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.hadoopGraph.configuration());
+ final SparkMemory memory = new SparkMemory(Collections.emptySet());
+
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
- return null;
- });
+ final long startTime = System.currentTimeMillis();
+ // load the graph
+ if (null != this.vertexProgram) {
+ final SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + this.vertexProgram);
+ hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+ if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
+ hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+
+ // set up the input format
+ final JavaSparkContext sc = new JavaSparkContext(sparkConfiguration);
+ final JavaPairRDD<NullWritable, VertexWritable> rdd = sc.newAPIHadoopRDD(hadoopConfiguration,
+ (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+ NullWritable.class,
+ VertexWritable.class);
+ final JavaPairRDD<Object, SparkMessenger<Double>> rdd2 = rdd.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()), new ArrayList<>())));
+ GraphComputerRDD<Double> g = GraphComputerRDD.of(rdd2);
+
+ // set up the vertex program
+ this.vertexProgram.setup(memory);
+ final org.apache.commons.configuration.Configuration vertexProgramConfiguration = new SerializableConfiguration();
+ this.vertexProgram.storeState(vertexProgramConfiguration);
+
+ // execute the vertex program
+ while (true) {
+ g = g.execute(vertexProgramConfiguration, memory);
+ g.foreachPartition(iterator -> doNothing());
+ memory.incrIteration();
+ if (this.vertexProgram.terminate(memory))
+ break;
+ }
+ // write the output graph back to disk
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ try {
+ FileSystem.get(hadoopConfiguration).delete(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ // map back to a <nullwritable,vertexwritable> stream for output
+ g.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable<>(tuple._2().vertex)))
+ .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.SYSTEM_G,
+ NullWritable.class,
+ VertexWritable.class,
+ (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class));
+ }
+ }
+
+ // execute mapreduce jobs
+ for (final MapReduce mapReduce : this.mapReducers) {
+ //TODO
+ /* g.mapValues(messenger -> {
+ mapReduce.map(messenger.vertex, null);
+ return messenger;
+ }).combine().reduce();*/
+ }
+ // update runtime and return the newly computed graph
+ memory.setRuntime(System.currentTimeMillis() - startTime);
+ memory.complete();
+ return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph), memory.asImmutable());
+ }
+ );
+ }
+
+ private static final void doNothing() {
+ // a cheap action
}
+ /////////////////
+
+ public static void main(final String[] args) throws Exception {
+ final FileConfiguration configuration = new PropertiesConfiguration("/Users/marko/software/tinkerpop/tinkerpop3/hadoop-gremlin/conf/spark-kryo.properties");
+ // TODO: final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+ final HadoopGraph graph = HadoopGraph.open(configuration);
+ final ComputerResult result = new SparkGraphComputer(graph).program(VertexProgram.createVertexProgram(configuration)).submit().get();
+ System.out.println(result);
+ //result.graph().configuration().getKeys().forEachRemaining(key -> System.out.println(key + "-->" + result.graph().configuration().getString(key)));
+ result.graph().V().valueMap().forEachRemaining(System.out::println);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index b18940a..cc170c4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -64,11 +64,6 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
}
@Override
- public String toString() {
- return "messageBox[incoming(" + this.incoming.size() + "):outgoing(" + this.outgoing.size() + ")]";
- }
-
- @Override
public Iterable<M> receiveMessages(final MessageScope messageScope) {
return this.incoming;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/051994ae/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
index 5a81017..38f8a61 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
@@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
@@ -38,54 +39,47 @@ import java.util.Iterator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public class SparkVertex implements Vertex, Vertex.Iterators, Serializable {
+public final class SparkVertex implements Vertex, Vertex.Iterators, Serializable {
private static KryoWriter KRYO_WRITER = KryoWriter.build().create();
private static KryoReader KRYO_READER = KryoReader.build().create();
private static final String VERTEX_ID = Graph.Hidden.hide("giraph.gremlin.vertexId");
private transient TinkerVertex vertex;
- private byte[] serializedForm;
+ private byte[] vertexBytes;
public SparkVertex(final TinkerVertex vertex) {
this.vertex = vertex;
this.vertex.graph().variables().set(VERTEX_ID, this.vertex.id());
- this.deflateVertex();
}
@Override
- public Edge addEdge(String label, Vertex inVertex, Object... keyValues) {
- inflateVertex();
+ public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
return this.vertex.addEdge(label, inVertex, keyValues);
}
@Override
public Object id() {
- inflateVertex();
return this.vertex.id();
}
@Override
public String label() {
- inflateVertex();
return this.vertex.label();
}
@Override
public Graph graph() {
- inflateVertex();
return this.vertex.graph();
}
@Override
- public <V> VertexProperty<V> property(String key, V value) {
- inflateVertex();
+ public <V> VertexProperty<V> property(final String key, final V value) {
return this.vertex.property(key, value);
}
@Override
public void remove() {
- inflateVertex();
this.vertex.remove();
}
@@ -95,51 +89,59 @@ public class SparkVertex implements Vertex, Vertex.Iterators, Serializable {
}
@Override
- public Iterator<Edge> edgeIterator(Direction direction, String... edgeLabels) {
- inflateVertex();
+ public Iterator<Edge> edgeIterator(final Direction direction, final String... edgeLabels) {
return this.vertex.iterators().edgeIterator(direction, edgeLabels);
}
@Override
- public Iterator<Vertex> vertexIterator(Direction direction, String... edgeLabels) {
- inflateVertex();
+ public Iterator<Vertex> vertexIterator(final Direction direction, final String... edgeLabels) {
return this.vertex.iterators().vertexIterator(direction, edgeLabels);
}
@Override
- public <V> Iterator<VertexProperty<V>> propertyIterator(String... propertyKeys) {
- inflateVertex();
+ public <V> Iterator<VertexProperty<V>> propertyIterator(final String... propertyKeys) {
return this.vertex.iterators().propertyIterator(propertyKeys);
}
+ ///////////////////////////////
+
private void writeObject(final ObjectOutputStream outputStream) throws IOException {
- this.inflateVertex();
this.deflateVertex();
outputStream.defaultWriteObject();
}
+ private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
+ inputStream.defaultReadObject();
+ this.inflateVertex();
+ }
+
private final void inflateVertex() {
if (null != this.vertex)
return;
try {
- final ByteArrayInputStream bis = new ByteArrayInputStream(this.serializedForm);
+ final ByteArrayInputStream bis = new ByteArrayInputStream(this.vertexBytes);
final TinkerGraph tinkerGraph = TinkerGraph.open();
KRYO_READER.readGraph(bis, tinkerGraph);
bis.close();
+ this.vertexBytes = null;
this.vertex = (TinkerVertex) tinkerGraph.iterators().vertexIterator(tinkerGraph.variables().get(VERTEX_ID).get()).next();
- } catch (final Exception e) {
+ } catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private final void deflateVertex() {
+ if (null != this.vertexBytes)
+ return;
+
try {
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
KRYO_WRITER.writeGraph(bos, this.vertex.graph());
bos.flush();
bos.close();
- this.serializedForm = bos.toByteArray();
+ this.vertex = null;
+ this.vertexBytes = bos.toByteArray();
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}