You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/07 22:37:58 UTC

flink git commit: [FLINK-2561] [gelly] add gelly-scala examples: vertex-centric SSSP, GSA SSSP and how to use a library method (connected components).

Repository: flink
Updated Branches:
  refs/heads/master f9eea5e5a -> f2186a604


[FLINK-2561] [gelly] add gelly-scala examples: vertex-centric SSSP, GSA SSSP
and how to use a library method (connected components).

This closes #1211


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2186a60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2186a60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2186a60

Branch: refs/heads/master
Commit: f2186a604f407e7b6db534cf6f9e50e27eac765a
Parents: f9eea5e
Author: vasia <va...@apache.org>
Authored: Thu Oct 1 22:26:25 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Wed Oct 7 22:37:08 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |   4 +-
 .../org/apache/flink/graph/scala/Graph.scala    |  16 +-
 .../scala/example/ConnectedComponents.scala     | 121 +++++++++++++
 .../example/GSASingleSourceShortestPaths.scala  | 156 +++++++++++++++++
 .../graph/scala/example/GraphMetrics.scala      |  19 +--
 .../example/SingleSourceShortestPaths.scala     | 170 +++++++++++++++++++
 .../graph/example/ConnectedComponents.java      |   2 +-
 .../utils/ConnectedComponentsDefaultData.java   |  21 ++-
 .../utils/SingleSourceShortestPathsData.java    |  37 ++--
 9 files changed, 501 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index fa2c86c..766b395 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -265,10 +265,10 @@ If no vertex input is provided during Graph creation, Gelly will automatically p
 val env = ExecutionEnvironment.getExecutionEnvironment
 
 // initialize the vertex value to be equal to the vertex ID
-val graph = Graph.fromCollection(edgeList, env,
+val graph = Graph.fromCollection(edgeList,
     new MapFunction[Long, Long] {
        def map(id: Long): Long = id
-    })
+    }, env)
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 38702f3..28f3f12 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -57,8 +57,8 @@ object Graph {
   * map function to the vertex ids.
   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment,
-  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
     wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
   }
 
@@ -87,8 +87,8 @@ object Graph {
   * map function to the vertex ids.
   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment,
-  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
     wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
   }
 
@@ -120,8 +120,8 @@ object Graph {
   * map function to the vertex ids.
   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment,
-  mapper: MapFunction[K, VV]): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV],
+      env: ExecutionEnvironment): Graph[K, VV, EV] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
     wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
   }
@@ -230,7 +230,7 @@ object Graph {
 
       // initializer provided
       if (mapper != null) {
-        fromTupleDataSet[K, VV, EV](edges, env, mapper)
+        fromTupleDataSet[K, VV, EV](edges, mapper, env)
       }
       else {
         fromTupleDataSet[K, EV](edges, env) 
@@ -244,7 +244,7 @@ object Graph {
 
       // no initializer provided
       if (mapper != null) {
-        fromTupleDataSet[K, VV, NullValue](edges, env, mapper)
+        fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
       }
       else {
         fromTupleDataSet[K, NullValue](edges, env) 

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
new file mode 100644
index 0000000..b3da520
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.library.GSAConnectedComponents
+import java.lang.Long
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in [[org.apache.flink.graph.library]]. 
+ * 
+ * In particular, this example uses the
+ * [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 and 1-3.
+ *
+ * Usage {{
+ *   ConnectedComponents <edge path> <result path> <number of iterations>
+ *   }}
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
+ */
+object ConnectedComponents {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
+
+    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
+
+
+    // emit result
+    if (fileOutput) {
+      components.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Connected Components Example")
+    } else {
+      components.print()
+    }
+  }
+
+  private final class InitVertices extends MapFunction[Long, Long] {
+    override def map(id: Long) = {id}
+  }
+
+  // ***********************************************************************
+  // UTIL METHODS
+  // ***********************************************************************
+
+    private var fileOutput = false
+    private var edgesInputPath: String = null
+    private var outputPath: String = null
+    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
+
+    private def parseParameters(args: Array[String]): Boolean = {
+      if(args.length > 0) {
+        if(args.length != 3) {
+          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+            "<num iterations>")
+          false
+        }
+        fileOutput = true
+        edgesInputPath = args(0)
+        outputPath = args(1)
+        maxIterations = (2).toInt
+      } else {
+        System.out.println("Executing ConnectedComponents example with default parameters" +
+          " and built-in default data.")
+        System.out.println("  Provide parameters to read input data from files.")
+        System.out.println("  See the documentation for the correct format of input files.")
+        System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+          "<num iterations>");
+      }
+      true
+    }
+
+    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+      if (fileOutput) {
+        env.readCsvFile[(Long, Long)](edgesInputPath,
+          lineDelimiter = "\n",
+          fieldDelimiter = "\t")
+          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+      } else {
+        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
+          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+        }
+        env.fromCollection(edgeData).map(
+        edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..2dc272c
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+import org.apache.flink.graph.gsa.GatherFunction
+import org.apache.flink.graph.gsa.Neighbor
+import org.apache.flink.graph.gsa.SumFunction
+import org.apache.flink.graph.gsa.ApplyFunction
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+    // Execute the gather-sum-apply iteration
+    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
+      new UpdateDistance, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("GSA Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // --------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
+    override def gather(neighbor: Neighbor[Double, Double]) = {
+      neighbor.getNeighborValue + neighbor.getEdgeValue
+    }
+  }
+
+  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
+    override def sum(newValue: Double, currentValue: Double) = {
+      Math.min(newValue, currentValue)
+    }
+  }
+
+  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
+    override def apply(newDistance: Double, oldDistance: Double) = {
+      if (newDistance < oldDistance) {
+        setResult(newDistance)
+      }
+    }
+  }
+
+  // **************************************************************************
+  // UTIL METHODS
+  // **************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+          " <input edges path> <output path> <num iterations>")
+        false
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = (3).toInt
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+        " <input edges path> <output path> <num iterations>");
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
index 68d9285..4eed824 100644
--- a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
@@ -106,21 +106,20 @@ object GraphMetrics {
   private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
     if (fileOutput) {
       env.readCsvFile[(Long, Long)](
-          edgesPath,
-          fieldDelimiter = "\t").map(
-          in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    }
-    else {
+        edgesPath,
+        fieldDelimiter = "\t").map(
+        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+    } else {
       env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-         (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-         val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
           for ( i <- 0 to numOutEdges ) {
             var target: Long = ((Math.random() * numVertices) + 1).toLong
-              new Edge[Long, NullValue](key, target, NullValue.getInstance())
+            new Edge[Long, NullValue](key, target, NullValue.getInstance())
           }
-        })
-      }
+      })
     }
+  }
 
   private var fileOutput: Boolean = false
   private var edgesPath: String = null

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..65a8e7f
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.example;
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's vertex-centric iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+    // Execute the vertex-centric iteration
+    val result = graph.runVertexCentricIteration(new VertexDistanceUpdater,
+      new MinDistanceMessenger, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // --------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  /**
+   * Function that updates the value of a vertex by picking the minimum
+   * distance from all incoming messages.
+   */
+  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+
+    override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
+      var minDistance = Double.MaxValue
+      while (inMessages.hasNext) {
+        var msg = inMessages.next
+        if (msg < minDistance) {
+          minDistance = msg
+        }
+      }
+      if (vertex.getValue > minDistance) {
+        setNewVertexValue(minDistance)
+      }
+    }
+  }
+
+  /**
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
+   */
+  private final class MinDistanceMessenger extends
+    MessagingFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      for (edge: Edge[Long, Double] <- getEdges) {
+        sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
+      }
+    }
+  }
+
+  // ****************************************************************************
+  // UTIL METHODS
+  // ****************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+          " <input edges path> <output path> <num iterations>")
+        false
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = (3).toInt
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+        " <input edges path> <output path> <num iterations>");
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 4189602..cd52e04 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue;
  * This example shows how to use Gelly's library methods.
  * You can find all available library methods in {@link org.apache.flink.graph.library}. 
  * 
- * In particular, this example uses the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * In particular, this example uses the {@link org.apache.flink.graph.library.GSAConnectedComponents}
  * library method to compute the connected components of the input graph.
  *
  * The input file is a plain text file and must be formatted as follows:

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
index b9556a9..67864eb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.types.NullValue;
 
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 /**
@@ -36,14 +36,19 @@ public class ConnectedComponentsDefaultData {
 
 	public static final String EDGES = "1	2\n" + "2	3\n" + "2	4\n" + "3	4";
 
-	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
-		edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
-		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
-		edges.add(new Edge<Long, NullValue>(2L, 4L, NullValue.getInstance()));
-		edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+	public static final Object[][] DEFAULT_EDGES = new Object[][] {
+		new Object[]{1L, 2L},
+		new Object[]{2L, 3L},
+		new Object[]{2L, 4L},
+		new Object[]{3L, 4L}
+	};
 
-		return env.fromCollection(edges);
+	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		List<Edge<Long, NullValue>> edgeList = new LinkedList<Edge<Long, NullValue>>();
+		for (Object[] edge : DEFAULT_EDGES) {
+			edgeList.add(new Edge<Long, NullValue>((Long) edge[0], (Long) edge[1], NullValue.getInstance()));
+		}
+		return env.fromCollection(edgeList);
 	}
 
 	public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + "3,1\n" + "4,1";

http://git-wip-us.apache.org/repos/asf/flink/blob/f2186a60/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
index cf0034a..6b985c5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.graph.example.utils;
 
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Provides the default data set used for the Single Source Shortest Paths example program.
  * If no parameters are given to the program, the default edge data set is used.
@@ -36,22 +36,27 @@ public class SingleSourceShortestPathsData {
 	public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
 					"4\t5\t45.0\n" + "5\t1\t51.0";
 
-	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
-		edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
-		edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
-		edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
-		edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
-		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
-		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
-		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
-		return env.fromCollection(edges);
-	}
+	public static final Object[][] DEFAULT_EDGES = new Object[][] {
+		new Object[]{1L, 2L, 12.0},
+		new Object[]{1L, 3L, 13.0},
+		new Object[]{2L, 3L, 23.0},
+		new Object[]{3L, 4L, 34.0},
+		new Object[]{3L, 5L, 35.0},
+		new Object[]{4L, 5L, 45.0},
+		new Object[]{5L, 1L, 51.0}
+	};
 
 	public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS =  "1,0.0\n" + "2,12.0\n" + "3,13.0\n" + 
 								"4,47.0\n" + "5,48.0";
 
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		
+		List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>();
+		for (Object[] edge : DEFAULT_EDGES) {
+			edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2]));
+		}
+		return env.fromCollection(edgeList);
+	}
+
 	private SingleSourceShortestPathsData() {}
 }