You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/07 22:37:58 UTC
flink git commit: [FLINK-2561] [gelly] add gelly-scala examples:
vertex-centric SSSP,
GSA SSSP and how to use a library method (connected components).
Repository: flink
Updated Branches:
refs/heads/master f9eea5e5a -> f2186a604
[FLINK-2561] [gelly] add gelly-scala examples: vertex-centric SSSP, GSA SSSP
and how to use a library method (connected components).
This closes #1211
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186a60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186a60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186a60
Branch: refs/heads/master
Commit: f2186a604f407e7b6db534cf6f9e50e27eac765a
Parents: f9eea5e
Author: vasia <va...@apache.org>
Authored: Thu Oct 1 22:26:25 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Wed Oct 7 22:37:08 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 4 +-
.../org/apache/flink/graph/scala/Graph.scala | 16 +-
.../scala/example/ConnectedComponents.scala | 121 +++++++++++++
.../example/GSASingleSourceShortestPaths.scala | 156 +++++++++++++++++
.../graph/scala/example/GraphMetrics.scala | 19 +--
.../example/SingleSourceShortestPaths.scala | 170 +++++++++++++++++++
.../graph/example/ConnectedComponents.java | 2 +-
.../utils/ConnectedComponentsDefaultData.java | 21 ++-
.../utils/SingleSourceShortestPathsData.java | 37 ++--
9 files changed, 501 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index fa2c86c..766b395 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -265,10 +265,10 @@ If no vertex input is provided during Graph creation, Gelly will automatically p
val env = ExecutionEnvironment.getExecutionEnvironment
// initialize the vertex value to be equal to the vertex ID
-val graph = Graph.fromCollection(edgeList, env,
+val graph = Graph.fromCollection(edgeList,
new MapFunction[Long, Long] {
def map(id: Long): Long = id
- })
+ }, env)
{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 38702f3..28f3f12 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -57,8 +57,8 @@ object Graph {
* map function to the vertex ids.
*/
def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
- TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment,
- mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV],
+ env: ExecutionEnvironment): Graph[K, VV, EV] = {
wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
}
@@ -87,8 +87,8 @@ object Graph {
* map function to the vertex ids.
*/
def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
- TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment,
- mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV],
+ env: ExecutionEnvironment): Graph[K, VV, EV] = {
wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
}
@@ -120,8 +120,8 @@ object Graph {
* map function to the vertex ids.
*/
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
- TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment,
- mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+ TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV],
+ env: ExecutionEnvironment): Graph[K, VV, EV] = {
val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
}
@@ -230,7 +230,7 @@ object Graph {
// initializer provided
if (mapper != null) {
- fromTupleDataSet[K, VV, EV](edges, env, mapper)
+ fromTupleDataSet[K, VV, EV](edges, mapper, env)
}
else {
fromTupleDataSet[K, EV](edges, env)
@@ -244,7 +244,7 @@ object Graph {
// no initializer provided
if (mapper != null) {
- fromTupleDataSet[K, VV, NullValue](edges, env, mapper)
+ fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
}
else {
fromTupleDataSet[K, NullValue](edges, env)
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
new file mode 100644
index 0000000..b3da520
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.library.GSAConnectedComponents
+import java.lang.Long
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in [[org.apache.flink.graph.library]].
+ *
+ * In particular, this example uses the
+ * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 and 1-3.
+ *
+ * Usage {{
+ * ConnectedComponents <edge path> <result path> <number of iterations>
+ * }}
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
+ */
+object ConnectedComponents {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
+ val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
+
+ val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
+
+
+ // emit result
+ if (fileOutput) {
+ components.writeAsCsv(outputPath, "\n", ",")
+ env.execute("Connected Components Example")
+ } else {
+ components.print()
+ }
+ }
+
+ private final class InitVertices extends MapFunction[Long, Long] {
+ override def map(id: Long) = {id}
+ }
+
+ // ***********************************************************************
+ // UTIL METHODS
+ // ***********************************************************************
+
+ private var fileOutput = false
+ private var edgesInputPath: String = null
+ private var outputPath: String = null
+ private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if(args.length > 0) {
+ if(args.length != 3) {
+ System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+ "<num iterations>")
+ false
+ }
+ fileOutput = true
+ edgesInputPath = args(0)
+ outputPath = args(1)
+ maxIterations = (2).toInt
+ } else {
+ System.out.println("Executing ConnectedComponents example with default parameters" +
+ " and built-in default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+ "<num iterations>");
+ }
+ true
+ }
+
+ private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long)](edgesInputPath,
+ lineDelimiter = "\n",
+ fieldDelimiter = "\t")
+ .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+ } else {
+ val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
+ case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+ }
+ env.fromCollection(edgeData).map(
+ edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..2dc272c
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+import org.apache.flink.graph.gsa.GatherFunction
+import org.apache.flink.graph.gsa.Neighbor
+import org.apache.flink.graph.gsa.SumFunction
+import org.apache.flink.graph.gsa.ApplyFunction
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ *
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+ val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+ // Execute the gather-sum-apply iteration
+ val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
+ new UpdateDistance, maxIterations)
+
+ // Extract the vertices as the result
+ val singleSourceShortestPaths = result.getVertices
+
+ // emit result
+ if (fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+ env.execute("GSA Single Source Shortest Paths Example")
+ } else {
+ singleSourceShortestPaths.print()
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+ override def map(id: Long) = {
+ if (id.equals(srcId)) {
+ 0.0
+ } else {
+ Double.PositiveInfinity
+ }
+ }
+ }
+
+ private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
+ override def gather(neighbor: Neighbor[Double, Double]) = {
+ neighbor.getNeighborValue + neighbor.getEdgeValue
+ }
+ }
+
+ private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
+ override def sum(newValue: Double, currentValue: Double) = {
+ Math.min(newValue, currentValue)
+ }
+ }
+
+ private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
+ override def apply(newDistance: Double, oldDistance: Double) = {
+ if (newDistance < oldDistance) {
+ setResult(newDistance)
+ }
+ }
+ }
+
+ // **************************************************************************
+ // UTIL METHODS
+ // **************************************************************************
+
+ private var fileOutput = false
+ private var srcVertexId = 1L
+ private var edgesInputPath: String = null
+ private var outputPath: String = null
+ private var maxIterations = 5
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>")
+ false
+ }
+ fileOutput = true
+ srcVertexId = args(0).toLong
+ edgesInputPath = args(1)
+ outputPath = args(2)
+ maxIterations = (3).toInt
+ } else {
+ System.out.println("Executing Single Source Shortest Paths example "
+ + "with default parameters and built-in default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ }
+ true
+ }
+
+ private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+ lineDelimiter = "\n",
+ fieldDelimiter = "\t")
+ .map(new Tuple3ToEdgeMap[Long, Double]())
+ } else {
+ val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+ case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+ z.asInstanceOf[Double])
+ }
+ env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
index 68d9285..4eed824 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -106,21 +106,20 @@ object GraphMetrics {
private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
if (fileOutput) {
env.readCsvFile[(Long, Long)](
- edgesPath,
- fieldDelimiter = "\t").map(
- in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
- }
- else {
+ edgesPath,
+ fieldDelimiter = "\t").map(
+ in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+ } else {
env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
- (key: Long, out: Collector[Edge[Long, NullValue]]) => {
- val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+ (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+ val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
for ( i <- 0 to numOutEdges ) {
var target: Long = ((Math.random() * numVertices) + 1).toLong
- new Edge[Long, NullValue](key, target, NullValue.getInstance())
+ new Edge[Long, NullValue](key, target, NullValue.getInstance())
}
- })
- }
+ })
}
+ }
private var fileOutput: Boolean = false
private var edgesPath: String = null
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..65a8e7f
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's vertex-centric iterations.
+ *
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+ val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+ // Execute the vertex-centric iteration
+ val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
+ new MinDistanceMessenger, maxIterations)
+
+ // Extract the vertices as the result
+ val singleSourceShortestPaths = result.getVertices
+
+ // emit result
+ if (fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+ env.execute("Single Source Shortest Paths Example")
+ } else {
+ singleSourceShortestPaths.print()
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+ override def map(id: Long) = {
+ if (id.equals(srcId)) {
+ 0.0
+ } else {
+ Double.PositiveInfinity
+ }
+ }
+ }
+
+ /**
+ * Function that updates the value of a vertex by picking the minimum
+ * distance from all incoming messages.
+ */
+ private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+
+ override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
+ var minDistance = Double.MaxValue
+ while (inMessages.hasNext) {
+ var msg = inMessages.next
+ if (msg < minDistance) {
+ minDistance = msg
+ }
+ }
+ if (vertex.getValue > minDistance) {
+ setNewVertexValue(minDistance)
+ }
+ }
+ }
+
+ /**
+ * Distributes the minimum distance associated with a given vertex among all
+ * the target vertices summed up with the edge's value.
+ */
+ private final class MinDistanceMessenger extends
+ MessagingFunction[Long, Double, Double, Double] {
+
+ override def sendMessages(vertex: Vertex[Long, Double]) {
+ for (edge: Edge[Long, Double] <- getEdges) {
+ sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
+ }
+ }
+ }
+
+ // ****************************************************************************
+ // UTIL METHODS
+ // ****************************************************************************
+
+ private var fileOutput = false
+ private var srcVertexId = 1L
+ private var edgesInputPath: String = null
+ private var outputPath: String = null
+ private var maxIterations = 5
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>")
+ false
+ }
+ fileOutput = true
+ srcVertexId = args(0).toLong
+ edgesInputPath = args(1)
+ outputPath = args(2)
+ maxIterations = (3).toInt
+ } else {
+ System.out.println("Executing Single Source Shortest Paths example "
+ + "with default parameters and built-in default data.")
+ System.out.println(" Provide parameters to read input data from files.")
+ System.out.println(" See the documentation for the correct format of input files.")
+ System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
+ }
+ true
+ }
+
+ private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+ lineDelimiter = "\n",
+ fieldDelimiter = "\t")
+ .map(new Tuple3ToEdgeMap[Long, Double]())
+ } else {
+ val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+ case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+ z.asInstanceOf[Double])
+ }
+ env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 4189602..cd52e04 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue;
* This example shows how to use Gelly's library methods.
* You can find all available library methods in {@link org.apache.flink.graph.library}.
*
- * In particular, this example uses the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * In particular, this example uses the {@link org.apache.flink.graph.library.GSAConnectedComponents}
* library method to compute the connected components of the input graph.
*
* The input file is a plain text file and must be formatted as follows:
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
index b9556a9..67864eb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.types.NullValue;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
/**
@@ -36,14 +36,19 @@ public class ConnectedComponentsDefaultData {
public static final String EDGES = "1 2\n" + "2 3\n" + "2 4\n" + "3 4";
- public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
- List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
- edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
- edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
- edges.add(new Edge<Long, NullValue>(2L, 4L, NullValue.getInstance()));
- edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+ public static final Object[][] DEFAULT_EDGES = new Object[][] {
+ new Object[]{1L, 2L},
+ new Object[]{2L, 3L},
+ new Object[]{2L, 4L},
+ new Object[]{3L, 4L}
+ };
- return env.fromCollection(edges);
+ public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+ List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>();
+ for (Object[] edge : DEFAULT_EDGES) {
+ edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance()));
+ }
+ return env.fromCollection(edgeList);
}
public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1";
http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
index cf0034a..6b985c5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -18,13 +18,13 @@
package org.apache.flink.graph.example.utils;
+import java.util.LinkedList;
+import java.util.List;
+
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Provides the default data set used for the Single Source Shortest Paths example program.
* If no parameters are given to the program, the default edge data set is used.
@@ -36,22 +36,27 @@ public class SingleSourceShortestPathsData {
public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
"4\t5\t45.0\n" + "5\t1\t51.0";
- public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
- List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
- edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
- edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
- edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
- edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
- edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
- edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
- edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
- return env.fromCollection(edges);
- }
+ public static final Object[][] DEFAULT_EDGES = new Object[][] {
+ new Object[]{1L, 2L, 12.0},
+ new Object[]{1L, 3L, 13.0},
+ new Object[]{2L, 3L, 23.0},
+ new Object[]{3L, 4L, 34.0},
+ new Object[]{3L, 5L, 35.0},
+ new Object[]{4L, 5L, 45.0},
+ new Object[]{5L, 1L, 51.0}
+ };
public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "1,0.0\n" + "2,12.0\n" + "3,13.0\n" +
"4,47.0\n" + "5,48.0";
+ public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>();
+ for (Object[] edge : DEFAULT_EDGES) {
+ edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2]));
+ }
+ return env.fromCollection(edgeList);
+ }
+
private SingleSourceShortestPathsData() {}
}