You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/01 11:54:18 UTC

[7/9] flink git commit: [FLINK-2561] [gelly] add Scala Gelly docs

[FLINK-2561] [gelly] add Scala Gelly docs

This closes #1204


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/937793e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/937793e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/937793e1

Branch: refs/heads/master
Commit: 937793e17a177e6165f8726b4f0de3b7fa197e45
Parents: f2ea4e4
Author: vasia <va...@apache.org>
Authored: Wed Sep 30 17:53:10 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 1 11:04:00 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md | 653 +++++++++++++++++++++++++++++++++++++++---
 1 file changed, 618 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/937793e1/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index b6a0533..0c3748b 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -23,7 +23,7 @@ under the License.
 
 <a href="#top"></a>
 
-Gelly is a Java Graph API for Flink. It contains a set of methods and utilities which aim to simplify the development of graph analysis applications in Flink. In Gelly, graphs can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Gelly provides methods to create, transform and modify graphs, as well as a library of graph algorithms.
+Gelly is a Graph API for Flink. It contains a set of methods and utilities which aim to simplify the development of graph analysis applications in Flink. In Gelly, graphs can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Gelly provides methods to create, transform and modify graphs, as well as a library of graph algorithms.
 
 * This will be replaced by the TOC
 {:toc}
@@ -35,17 +35,30 @@ Gelly is currently part of the *staging* Maven project. All relevant classes are
 
 Add the following dependency to your `pom.xml` to use Gelly.
 
-~~~xml
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-gelly</artifactId>
     <version>{{site.version}}</version>
 </dependency>
-~~~
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-gelly-scala</artifactId>
+    <version>{{site.version}}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
 
 Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution [here](../apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink Java API. After reading this guide, you might also want to check the {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ "Gelly examples" %}.
+The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API. After reading this guide, you might also want to check the {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ "Gelly examples" %}.
 
 Graph Representation
 -----------
@@ -54,6 +67,8 @@ In Gelly, a `Graph` is represented by a `DataSet` of vertices and a `DataSet` of
 
 The `Graph` nodes are represented by the `Vertex` type. A `Vertex` is defined by a unique ID and a value. `Vertex` IDs should implement the `Comparable` interface. Vertices without value can be represented by setting the value type to `NullValue`.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 // create a new vertex with a Long ID and a String value
 Vertex<Long, String> v = new Vertex<Long, String>(1L, "foo");
@@ -61,9 +76,23 @@ Vertex<Long, String> v = new Vertex<Long, String>(1L, "foo");
 // create a new vertex with a Long ID and no value
 Vertex<Long, NullValue> v = new Vertex<Long, NullValue>(1L, NullValue.getInstance());
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// create a new vertex with a Long ID and a String value
+val v = new Vertex(1L, "foo")
+
+// create a new vertex with a Long ID and no value
+val v = new Vertex(1L, NullValue.getInstance())
+{% endhighlight %}
+</div>
+</div>
 
 The graph edges are represented by the `Edge` type. An `Edge` is defined by a source ID (the ID of the source `Vertex`), a target ID (the ID of the target `Vertex`) and an optional value. The source and target IDs should be of the same type as the `Vertex` IDs. Edges with no value have a `NullValue` value type.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 Edge<Long, Double> e = new Edge<Long, Double>(1L, 2L, 0.5);
 
@@ -72,6 +101,19 @@ Edge<Long, Double> reversed = e.reverse();
 
 Double weight = e.getValue(); // weight = 0.5
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val e = new Edge(1L, 2L, 0.5)
+
+// reverse the source and target of this edge
+val reversed = e.reverse
+
+val weight = e.getValue // weight = 0.5
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -82,6 +124,8 @@ You can create a `Graph` in the following ways:
 
 * from a `DataSet` of edges and an optional `DataSet` of vertices:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -91,9 +135,25 @@ DataSet<Edge<String, Double>> edges = ...
 
 Graph<String, Long, Double> graph = Graph.fromDataSet(vertices, edges, env);
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertices: DataSet[Vertex[String, Long]] = ...
+
+val edges: DataSet[Edge[String, Double]] = ...
+
+val graph = Graph.fromDataSet(vertices, edges, env)
+{% endhighlight %}
+</div>
+</div>
 
 * from a `DataSet` of `Tuple3` and an optional `DataSet` of `Tuple2`. In this case, Gelly will convert each `Tuple3` to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each `Tuple2` will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -111,7 +171,6 @@ Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTup
 - `vertexTypes(Class<K> vertexKey, Class<VV> vertexValue)`: the Graph has vertex values, but no edge values.
 - `keyType(Class<K> vertexKey)`: the Graph has no vertex values and no edge values.
 
-
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -123,10 +182,26 @@ Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input",
 // create a Graph with no Vertex or Edge values
 Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexTuples = env.readCsvFile[String, Long]("path/to/vertex/input")
+
+val edgeTuples = env.readCsvFile[String, String, Double]("path/to/edge/input")
+
+val graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env)
+{% endhighlight %}
+</div>
+</div>
 
 
 * from a `Collection` of edges and an optional `Collection` of vertices:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -143,13 +218,39 @@ If no vertex input is provided during Graph creation, Gelly will automatically p
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 // initialize the vertex value to be equal to the vertex ID
-Graph<Long, Long, String> graph = Graph.fromCollection(edges, 
+Graph<Long, Long, String> graph = Graph.fromCollection(edgeList, 
 				new MapFunction<Long, Long>() {
 					public Long map(Long value) { 
 						return value; 
 					} 
 				}, env);
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexList = List(...)
+
+val edgeList = List(...)
+
+val graph = Graph.fromCollection(vertexList, edgeList, env)
+{% endhighlight %}
+
+If no vertex input is provided during Graph creation, Gelly will automatically produce the `Vertex` `DataSet` from the edge input. In this case, the created vertices will have no values. Alternatively, you can provide a `MapFunction` as an argument to the creation method, in order to initialize the `Vertex` values:
+
+{% highlight java %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// initialize the vertex value to be equal to the vertex ID
+val graph = Graph.fromCollection(edgeList, env,
+    new MapFunction[Long, Long] {
+       def map(id: Long): Long = id
+    })
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -158,6 +259,8 @@ Graph Properties
 
 Gelly includes the following methods for retrieving various Graph properties and metrics:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 // get the Vertex DataSet
 DataSet<Vertex<K, VV>> getVertices()
@@ -190,6 +293,43 @@ long numberOfEdges()
 DataSet<Triplet<K, VV, EV>> getTriplets()
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// get the Vertex DataSet
+getVertices: DataSet[Vertex[K, VV]]
+
+// get the Edge DataSet
+getEdges: DataSet[Edge[K, EV]]
+
+// get the IDs of the vertices as a DataSet
+getVertexIds: DataSet[K]
+
+// get the source-target pairs of the edge IDs as a DataSet
+getEdgeIds: DataSet[(K, K)]
+
+// get a DataSet of <vertex ID, in-degree> pairs for all vertices
+inDegrees: DataSet[(K, Long)]
+
+// get a DataSet of <vertex ID, out-degree> pairs for all vertices
+outDegrees: DataSet[(K, Long)]
+
+// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
+getDegrees: DataSet[(K, Long)]
+
+// get the number of vertices
+numberOfVertices: Long
+
+// get the number of edges
+numberOfEdges: Long
+
+// get a DataSet of Triplets<srcVertex, trgVertex, edge>
+getTriplets: DataSet[Triplet[K, VV, EV]]
+
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -198,6 +338,8 @@ Graph Transformations
 
 * <strong>Map</strong>: Gelly provides specialized methods for applying a map transformation on the vertex values or edge values. `mapVertices` and `mapEdges` return a new `Graph`, where the IDs of the vertices (or edges) remain unchanged, while the values are transformed according to the provided user-defined map function. The map functions also allow changing the type of the vertex or edge values.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env);
@@ -210,9 +352,23 @@ Graph<Long, Long, Long> updatedGraph = graph.mapVertices(
 					}
 				});
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+val graph = Graph.fromDataSet(vertices, edges, env)
+
+// increment each vertex value by one
+val updatedGraph = graph.mapVertices(v => v.getValue + 1)
+{% endhighlight %}
+</div>
+</div>
 
 * <strong>Filter</strong>: A filter transformation applies a user-defined filter function on the vertices or edges of the `Graph`. `filterOnEdges` will create a sub-graph of the original graph, keeping only the edges that satisfy the provided predicate. Note that the vertex dataset will not be modified. Respectively, `filterOnVertices` applies a filter on the vertices of the graph. Edges whose source and/or target do not satisfy the vertex predicate are removed from the resulting edge dataset. The `subgraph` method can be used to apply a filter function to the vertices and the edges at the same time.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 Graph<Long, Long, Long> graph = ...
 
@@ -230,6 +386,18 @@ graph.subgraph(
 				}
 		})
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val graph: Graph[Long, Long, Long] = ...
+
+// keep only vertices with positive values
+// and only edges with negative values
+graph.subgraph((vertex => vertex.getValue > 0), (edge => edge.getValue < 0))
+{% endhighlight %}
+</div>
+</div>
 
 <p class="text-center">
     <img alt="Filter Transformations" width="80%" src="fig/gelly-filter.png"/>
@@ -239,6 +407,8 @@ graph.subgraph(
 Similarly, an input dataset can be joined with the edges, using one of three methods. `joinWithEdges` expects an input `DataSet` of `Tuple3` and joins on the composite key of both source and target vertex IDs. `joinWithEdgesOnSource` expects a `DataSet` of `Tuple2` and joins on the source key of the edges and the first attribute of the input dataset and `joinWithEdgesOnTarget` expects a `DataSet` of `Tuple2` and joins on the target key of the edges and the first attribute of the input dataset. All three methods apply a map function on the edge and the input data set values.
 Note that if the input dataset contains a key multiple times, all Gelly join methods will only consider the first value encountered.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 Graph<Long, Double, Double> network = ...
 
@@ -252,6 +422,19 @@ Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(v
 					}
 				});
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val network: Graph[Long, Double, Double] = ...
+
+val vertexOutDegrees: DataSet[(Long, Long)] = network.outDegrees
+
+// assign the transition probabilities as the edge weights
+val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: Long) => v1 / v2)
+{% endhighlight %}
+</div>
+</div>
 
 * <strong>Reverse</strong>: the `reverse()` method returns a new `Graph` where the direction of all edges has been reversed.
 
@@ -272,6 +455,8 @@ Graph Mutations
 
 Gelly includes the following methods for adding and removing vertices and edges from an input `Graph`:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 // adds a Vertex to the Graph. If the Vertex already exists, it will not be added again.
 Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex)
@@ -296,9 +481,37 @@ Graph<K, VV, EV> removeEdge(Edge<K, EV> edge)
 
 // removes *all* edges that match the edges in the given list
 Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved)
+{% endhighlight %}
+</div>
 
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// adds a Vertex to the Graph. If the Vertex already exists, it will not be added again.
+addVertex(vertex: Vertex[K, VV])
 
+// adds a list of vertices to the Graph. If the vertices already exist in the graph, they will not be added once more.
+addVertices(verticesToAdd: List[Vertex[K, VV]])
+
+// adds an Edge to the Graph. If the source and target vertices do not exist in the graph, they will also be added.
+addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV)
+
+// adds a list of edges to the Graph. When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+addEdges(edges: List[Edge[K, EV]])
+
+// removes the given Vertex and its edges from the Graph.
+removeVertex(vertex: Vertex[K, VV])
+
+// removes the given list of vertices and their edges from the Graph
+removeVertices(verticesToBeRemoved: List[Vertex[K, VV]])
+
+// removes *all* edges that match the given Edge from the Graph.
+removeEdge(edge: Edge[K, EV])
+
+// removes *all* edges that match the edges in the given list
+removeEdges(edgesToBeRemoved: List[Edge[K, EV]])
 {% endhighlight %}
+</div>
+</div>
 
 Neighborhood Methods
 -----------
@@ -315,6 +528,8 @@ For example, assume that you want to select the minimum weight of all out-edges
 
 The following code will collect the out-edges for each vertex and apply the `SelectMinWeight()` user-defined function on each of the resulting neighborhoods:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 Graph<Long, Long, Double> graph = ...
 
@@ -329,6 +544,23 @@ static final class SelectMinWeight implements ReduceEdgesFunction<Double> {
 		}
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val graph: Graph[Long, Long, Double] = ...
+
+val minWeights = graph.reduceOnEdges(new SelectMinWeight, EdgeDirection.OUT)
+
+// user-defined function to select the minimum weight
+final class SelectMinWeight extends ReduceEdgesFunction[Double] {
+	override def reduceEdges(firstEdgeValue: Double, secondEdgeValue: Double): Double = {
+		Math.min(firstEdgeValue, secondEdgeValue)
+	}
+ }
+{% endhighlight %}
+</div>
+</div>
 
 <p class="text-center">
     <img alt="reduceOnEdges Example" width="50%" src="fig/gelly-reduceOnEdges.png"/>
@@ -336,6 +568,8 @@ static final class SelectMinWeight implements ReduceEdgesFunction<Double> {
 
 Similarly, assume that you would like to compute the sum of the values of all in-coming neighbors, for every vertex. The following code will collect the in-coming neighbors for each vertex and apply the `SumValues()` user-defined function on each neighborhood:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 Graph<Long, Long, Double> graph = ...
 
@@ -350,6 +584,23 @@ static final class SumValues implements ReduceNeighborsFunction<Long> {
 	  	}
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val graph: Graph[Long, Long, Double] = ...
+
+val verticesWithSum = graph.reduceOnNeighbors(new SumValues, EdgeDirection.IN)
+
+// user-defined function to sum the neighbor values
+final class SumValues extends ReduceNeighborsFunction[Long] {
+   	override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
+    	firstNeighbor + secondNeighbor
+    }
+}
+{% endhighlight %}
+</div>
+</div>
 
 <p class="text-center">
     <img alt="reduceOnNeighbors Example" width="70%" src="fig/gelly-reduceOnNeighbors.png"/>
@@ -361,6 +612,8 @@ These methods return zero, one or more values per vertex and provide access to t
 
 For example, the following code will output all the vertex pairs which are connected with an edge having a weight of 0.5 or more:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 Graph<Long, Long, Double> graph = ...
 
@@ -383,6 +636,32 @@ static final class SelectLargeWeightNeighbors implements NeighborsFunctionWithVe
 		}
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val graph: Graph[Long, Long, Double] = ...
+
+val vertexPairs = graph.groupReduceOnNeighbors(new SelectLargeWeightNeighbors, EdgeDirection.OUT)
+
+// user-defined function to select the neighbors which have edges with weight > 0.5
+final class SelectLargeWeightNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Double, 
+  (Vertex[Long, Long], Vertex[Long, Long])] {
+
+	override def iterateNeighbors(vertex: Vertex[Long, Long],
+		neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Long])],
+		out: Collector[(Vertex[Long, Long], Vertex[Long, Long])]) = {
+
+			for (neighbor <- neighbors) {
+				if (neighbor._1.getValue() > 0.5) {
+					out.collect(vertex, neighbor._2);
+				}
+			}
+		}
+   }
+{% endhighlight %}
+</div>
+</div>
 
 When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.
 
@@ -414,6 +693,8 @@ Let us consider computing Single-Source-Shortest-Paths with vertex-centric itera
     <img alt="Vertex-centric SSSP superstep 2" width="70%" src="fig/gelly-vc-sssp2.png"/>
 </p>
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %} 
 // read the input graph
 Graph<Long, Double, Double> graph = ...
@@ -432,19 +713,19 @@ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
 // - - -  UDFs - - - //
 
 // messaging
-public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+public static final class MinDistanceMessenger extends MessagingFunction<Long, Double, Double, Double> {
 
-	public void sendMessages(Vertex<K, Double> vertex) {
-		for (Edge<K, Double> edge : getEdges()) {
+	public void sendMessages(Vertex<Long, Double> vertex) {
+		for (Edge<Long, Double> edge : getEdges()) {
 			sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
 		}
 	}
 }
 
 // vertex update
-public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
 
-	public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
+	public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
 		Double minDistance = Double.MAX_VALUE;
 
 		for (double msg : inMessages) {
@@ -460,6 +741,56 @@ public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<
 }
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %} 
+// read the input graph
+val graph: Graph[Long, Double, Double] = ...
+
+// define the maximum number of iterations
+val maxIterations = 10
+
+// Execute the vertex-centric iteration
+val result = graph.runVertexCentricIteration(new VertexDistanceUpdater, new MinDistanceMessenger, maxIterations)
+
+// Extract the vertices as the result
+val singleSourceShortestPaths = result.getVertices
+
+
+// - - -  UDFs - - - //
+
+// messaging
+final class MinDistanceMessenger extends MessagingFunction[Long, Double, Double, Double] {
+
+	override def sendMessages(vertex: Vertex[Long, Double]) = {
+		for (edge: Edge[Long, Double] <- getEdges) {
+			sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+		}
+	}
+}
+
+// vertex update
+final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+
+	override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) = {
+		var minDistance = Double.MaxValue
+
+		while (inMessages.hasNext) {
+		  val msg = inMessages.next
+		  if (msg < minDistance) {
+			minDistance = msg
+		  }
+		}
+
+		if (vertex.getValue > minDistance) {
+		  setNewVertexValue(minDistance)
+		}
+	}
+}
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -488,6 +819,8 @@ If the degrees option is not set in the configuration, these methods will return
 
 * <strong>Messaging Direction</strong>: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either `EdgeDirection.IN`, `EdgeDirection.OUT`, `EdgeDirection.ALL`. The messaging direction also dictates the update direction which would be `EdgeDirection.OUT`, `EdgeDirection.IN` and `EdgeDirection.ALL`, respectively. This property can be set using the `setDirection()` method.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 
 Graph<Long, Double, Double> graph = ...
@@ -521,7 +854,7 @@ public static final class VertexUpdater extends VertexUpdateFunction {
 	}
 
 
-	public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator inMessages) {
+	public void updateVertex(Vertex<Long, Long> vertex, MessageIterator inMessages) {
 		
 		//do some computation
 		Long partialValue = ...
@@ -537,9 +870,62 @@ public static final class VertexUpdater extends VertexUpdateFunction {
 public static final class Messenger extends MessagingFunction {...}
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val graph: Graph[Long, Double, Double] = ...
+
+val parameters = new VertexCentricConfiguration
+
+// set the iteration name
+parameters.setName("Gelly Iteration")
+
+// set the parallelism
+parameters.setParallelism(16)
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator)
+
+// run the vertex-centric iteration, also passing the configuration parameters
+val result = graph.runVertexCentricIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+
+// user-defined functions
+final class VertexUpdater extends VertexUpdateFunction {
+
+	var aggregator = new LongSumAggregator
+
+	override def preSuperstep {
+	
+		// retrieve the Aggregator
+		aggregator = getIterationAggregator("sumAggregator")
+	}
+
+
+	override def updateVertex(vertex: Vertex[Long, Long], inMessages: MessageIterator[Long]) {
+		
+		//do some computation
+		val partialValue = ...
+
+		// aggregate the partial value
+		aggregator.aggregate(partialValue)
+
+		// update the vertex value
+		setNewVertexValue(...)
+	}
+}
+
+final class Messenger extends MessagingFunction {...}
+
+{% endhighlight %}
+</div>
+</div>
 
 The following example illustrates the usage of the degree as well as the number of vertices options.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 
 Graph<Long, Double, Double> graph = ...
@@ -574,11 +960,49 @@ public static final class Messenger {
 }
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val graph: Graph[Long, Double, Double] = ...
+
+// configure the iteration
+val parameters = new VertexCentricConfiguration
+
+// set the number of vertices option to true
+parameters.setOptNumVertices(true)
+
+// set the degree option to true
+parameters.setOptDegrees(true)
+
+// run the vertex-centric iteration, also passing the configuration parameters
+val result = graph.runVertexCentricIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+
+// user-defined functions
+final class VertexUpdater {
+	...
+	// get the number of vertices
+	val numVertices = getNumberOfVertices
+	...
+}
+
+final class Messenger {
+	...
+	// retrieve the vertex out-degree
+	val outDegree = getOutDegree
+	...
+}
+
+{% endhighlight %}
+</div>
+</div>
 
 The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
-
 Graph<Long, HashSet<Long>, Double> graph = ...
 
 // configure the iteration
@@ -594,29 +1018,35 @@ DataSet<Vertex<Long, HashSet<Long>>> result =
 			.getVertices();
 
 // user-defined functions
-public static final class VertexUpdater {
-	@Override
-    public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
-    	vertex.getValue().clear();
+public static final class VertexUpdater {...}
 
-    	for(long msg : messages) {
-    		vertex.getValue().add(msg);
-    	}
+public static final class Messenger {...}
 
-    	setNewVertexValue(vertex.getValue());
-    }
-}
+{% endhighlight %}
+</div>
 
-public static final class Messenger {
-	@Override
-    public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
-    	for (Edge<Long, Long> edge : getEdges()) {
-    		sendMessageTo(edge.getSource(), vertex.getId());
-    	}
-    }
-}
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val graph: Graph[Long, HashSet[Long], Double] = ...
+
+// configure the iteration
+val parameters = new VertexCentricConfiguration
+
+// set the messaging direction
+parameters.setDirection(EdgeDirection.IN)
+
+// run the vertex-centric iteration, also passing the configuration parameters
+val result = graph.runVertexCentricIteration(new VertexUpdater, new Messenger, maxIterations, parameters)
+			.getVertices
+
+// user-defined functions
+final class VertexUpdater {...}
+
+final class Messenger {...}
 
 {% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -646,6 +1076,8 @@ The resulting graph after the algorithm converges is shown below.
 
 To implement this example in Gelly GSA, the user only needs to call the `runGatherSumApplyIteration` method on the input graph and provide the `GatherFunction`, `SumFunction` and `ApplyFunction` UDFs. Iteration synchronization, grouping, value updates and convergence are handled by the system:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 // read the input graph
 Graph<Long, Double, Double> graph = ...
@@ -690,10 +1122,58 @@ private static final class UpdateDistance extends ApplyFunction<Long, Double, Do
 }
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// read the input graph
+val graph: Graph[Long, Double, Double] = ...
+
+// define the maximum number of iterations
+val maxIterations = 10
+
+// Execute the GSA iteration
+val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, new UpdateDistance, maxIterations)
+
+// Extract the vertices as the result
+val singleSourceShortestPaths = result.getVertices
+
+
+// - - -  UDFs - - - //
+
+// Gather
+final class CalculateDistances extends GatherFunction[Double, Double, Double] {
+
+	override def gather(neighbor: Neighbor[Double, Double]): Double = {
+		neighbor.getNeighborValue + neighbor.getEdgeValue
+	}
+}
+
+// Sum
+final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
+
+	override def sum(newValue: Double, currentValue: Double): Double = {
+		Math.min(newValue, currentValue)
+	}
+}
+
+// Apply
+final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
+
+	override def apply(newDistance: Double, oldDistance: Double) = {
+		if (newDistance < oldDistance) {
+			setResult(newDistance)
+		}
+	}
+}
+
+{% endhighlight %}
+</div>
+</div>
 
 Note that `gather` takes a `Neighbor` type as an argument. This is a convenience type which simply wraps a vertex with its neighboring edge.
 
-For more examples of how to implement algorithms with the Gather-Sum-Apply model, check the {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java "GSAPageRank" %} and {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java "GSAConnectedComponents" %} examples of Gelly.
+For more examples of how to implement algorithms with the Gather-Sum-Apply model, check the {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java "GSAPageRank" %} and {% gh_link /flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java "GSAConnectedComponents" %} library methods of Gelly.
 
 [Back to top](#top)
 
@@ -719,6 +1199,8 @@ using the `setDirection()` method.
 
 The following example illustrates the usage of the number of vertices option.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 
 Graph<Long, Double, Double> graph = ...
@@ -757,8 +1239,51 @@ public static final class Apply {
 }
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val graph: Graph[Long, Double, Double] = ...
+
+// configure the iteration
+val parameters = new GSAConfiguration
+
+// set the number of vertices option to true
+parameters.setOptNumVertices(true)
+
+// run the gather-sum-apply iteration, also passing the configuration parameters
+val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, maxIterations, parameters)
+
+// user-defined functions
+final class Gather {
+	...
+	// get the number of vertices
+	val numVertices = getNumberOfVertices
+	...
+}
+
+final class Sum {
+	...
+    // get the number of vertices
+    val numVertices = getNumberOfVertices
+    ...
+}
+
+final class Apply {
+	...
+    // get the number of vertices
+    val numVertices = getNumberOfVertices
+    ...
+}
+
+{% endhighlight %}
+</div>
+</div>
 
 The following example illustrates the usage of the edge direction option.
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 
 Graph<Long, HashSet<Long>, Double> graph = ...
@@ -775,6 +1300,25 @@ DataSet<Vertex<Long, HashSet<Long>>> result =
 			new Gather(), new Sum(), new Apply(), maxIterations, parameters)
 			.getVertices();
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val graph: Graph[Long, HashSet[Long], Double] = ...
+
+// configure the iteration
+val parameters = new GSAConfiguration
+
+// set the messaging direction
+parameters.setDirection(EdgeDirection.IN)
+
+// run the gather-sum-apply iteration, also passing the configuration parameters
+val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, maxIterations, parameters)
+			.getVertices()
+{% endhighlight %}
+</div>
+</div>
 [Back to top](#top)
 
 ### Vertex-centric and GSA Comparison
@@ -794,6 +1338,8 @@ Graph Validation
 Gelly provides a simple utility for performing validation checks on input graphs. Depending on the application context, a graph may or may not be valid according to certain criteria. For example, a user might need to validate whether their graph contains duplicate edges or whether its structure is bipartite. In order to validate a graph, one can define a custom `GraphValidator` and implement its `validate()` method. `InvalidVertexIdsValidator` is Gelly's pre-defined validator. It checks that the edge set contains valid vertex IDs, i.e. that all edge IDs
 also exist in the vertex IDs set.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -809,6 +1355,26 @@ Graph<Long, Long, Long> graph = Graph.fromCollection(vertices, edges, env);
 graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); 
 
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// create a list of vertices with IDs = {1, 2, 3, 4, 5}
+val vertices: List[Vertex[Long, Long]] = ...
+
+// create a list of edges with IDs = {(1, 2) (1, 3), (2, 4), (5, 6)}
+val edges: List[Edge[Long, Long]] = ...
+
+val graph = Graph.fromCollection(vertices, edges, env)
+
+// will return false: 6 is an invalid ID
+graph.validate(new InvalidVertexIdsValidator[Long, Long, Long])
+
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -828,6 +1394,8 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
 
 Gelly's library methods can be used by simply calling the `run()` method on the input graph:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -842,6 +1410,24 @@ verticesWithCommunity.print();
 
 env.execute();
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val graph: Graph[Long, Long, NullValue] = ...
+
+// run Label Propagation for 30 iterations to detect communities on the input graph
+val verticesWithCommunity = graph.run(new LabelPropagation[Long](30)).getVertices
+
+// print the result
+verticesWithCommunity.print
+
+env.execute
+{% endhighlight %}
+</div>
+</div>
 
 [Back to top](#top)
 
@@ -955,6 +1541,3 @@ The computation **terminates** after a specified *maximum number of supersteps*
 </p>
 
 [Back to top](#top)
-
-
-