You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/05/19 23:03:34 UTC

[05/10] flink git commit: [gelly] [refactoring] Removed Example end string from all gelly examples

[gelly] [refactoring] Removed Example end string from all gelly examples

Added Algorithm end string to the library methods

This closes #625


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f4039dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f4039dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f4039dc

Branch: refs/heads/master
Commit: 8f4039dcb326a1af276ac4b93ffe5ea4da3e19bc
Parents: 1479973
Author: andralungu <lu...@gmail.com>
Authored: Sun Apr 26 20:09:04 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:03 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/CommunityDetection.java | 140 ++++++++++++
 .../graph/example/ConnectedComponents.java      | 141 ++++++++++++
 .../example/ConnectedComponentsExample.java     | 141 ------------
 .../graph/example/EuclideanGraphExample.java    | 210 ------------------
 .../graph/example/EuclideanGraphWeighing.java   | 210 ++++++++++++++++++
 .../graph/example/GSAConnectedComponents.java   | 176 +++++++++++++++
 .../example/GSAConnectedComponentsExample.java  | 176 ---------------
 .../example/GSASingleSourceShortestPaths.java   | 178 +++++++++++++++
 .../GSASingleSourceShortestPathsExample.java    | 178 ---------------
 .../graph/example/JaccardSimilarityMeasure.java | 214 +++++++++++++++++++
 .../JaccardSimilarityMeasureExample.java        | 214 -------------------
 .../flink/graph/example/LabelPropagation.java   | 170 +++++++++++++++
 .../graph/example/LabelPropagationExample.java  | 170 ---------------
 .../flink/graph/example/MusicProfiles.java      |   4 +-
 .../apache/flink/graph/example/PageRank.java    | 149 +++++++++++++
 .../flink/graph/example/PageRankExample.java    | 149 -------------
 .../SimpleCommunityDetectionExample.java        | 140 ------------
 .../example/SingleSourceShortestPaths.java      | 133 ++++++++++++
 .../SingleSourceShortestPathsExample.java       | 133 ------------
 .../example/utils/CommunityDetectionData.java   |  65 ++++++
 .../utils/ConnectedComponentsDefaultData.java   |  52 +++++
 .../utils/ConnectedComponentsExampleData.java   |  52 -----
 .../utils/EdgeWithLongIdNullValueParser.java    |  33 ---
 .../graph/example/utils/EuclideanGraphData.java |  10 +-
 .../utils/SimpleCommunityDetectionData.java     |  65 ------
 .../utils/SingleSourceShortestPathsData.java    |   4 +
 .../library/CommunityDetectionAlgorithm.java    | 172 +++++++++++++++
 .../graph/library/ConnectedComponents.java      |  88 --------
 .../library/ConnectedComponentsAlgorithm.java   |  88 ++++++++
 .../flink/graph/library/LabelPropagation.java   | 111 ----------
 .../library/LabelPropagationAlgorithm.java      | 114 ++++++++++
 .../apache/flink/graph/library/PageRank.java    | 100 ---------
 .../flink/graph/library/PageRankAlgorithm.java  | 104 +++++++++
 .../graph/library/SimpleCommunityDetection.java | 172 ---------------
 .../library/SingleSourceShortestPaths.java      | 108 ----------
 .../SingleSourceShortestPathsAlgorithm.java     | 111 ++++++++++
 .../flink/graph/test/GatherSumApplyITCase.java  |  10 +-
 .../test/example/CommunityDetectionITCase.java  | 100 +++++++++
 .../test/example/ConnectedComponentsITCase.java |  10 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |   4 +-
 .../example/EuclideanGraphExampleITCase.java    |  77 -------
 .../example/EuclideanGraphWeighingITCase.java   |  77 +++++++
 .../JaccardSimilarityMeasureExampleITCase.java  |  72 -------
 .../example/JaccardSimilarityMeasureITCase.java |  72 +++++++
 .../example/LabelPropagationExampleITCase.java  | 143 -------------
 .../test/example/LabelPropagationITCase.java    | 143 +++++++++++++
 .../example/SimpleCommunityDetectionITCase.java | 100 ---------
 .../SingleSourceShortestPathsITCase.java        |   4 +-
 48 files changed, 2634 insertions(+), 2653 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
new file mode 100644
index 0000000..f9434d3
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetectionAlgorithm;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use the {@link org.apache.flink.graph.library.CommunityDetectionAlgorithm}
+ * library method:
+ * <ul>
+ * 	<li> with the edge data set given as a parameter
+ * 	<li> with default data
+ * </ul>
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
+ * 1-2 with weight 1.0 and 1-3 with weight 2.0.
+ *
+ * Usage <code>CommunityDetection &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.CommunityDetectionData}
+ */
+public class CommunityDetection implements ProgramDescription {
+
+	@SuppressWarnings("serial")
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// set up the graph
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+		Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, Long>() {
+					@Override
+					public Long map(Long label) throws Exception {
+						return label;
+					}
+				}, env);
+
+		// the result is in the form of <vertexId, communityId>, where the communityId is the label
+		// which the vertex converged to
+		DataSet<Vertex<Long, Long>> communityVertices =
+				graph.run(new CommunityDetectionAlgorithm(maxIterations, delta)).getVertices();
+
+		// emit result
+		if (fileOutput) {
+			communityVertices.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			communityVertices.print();
+		}
+
+		env.execute("Executing Community Detection Example");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Community Detection";
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static Integer maxIterations = CommunityDetectionData.MAX_ITERATIONS;
+	private static Double delta = CommunityDetectionData.DELTA;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage CommunityDetection <edge path> <output path> " +
+						"<num iterations> <delta>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+			delta = Double.parseDouble(args[3]);
+
+		} else {
+			System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage CommunityDetection <edge path> <output path> " +
+					"<num iterations> <delta>");
+		}
+
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return CommunityDetectionData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
new file mode 100644
index 0000000..b7c9045
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponentsAlgorithm}
+ * library method:
+ * <ul>
+ * 	<li> with the edge data set given as a parameter
+ * 	<li> with default data
+ * </ul>
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 with and 1-3.
+ *
+ * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; </code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData}
+ */
+public class ConnectedComponents implements ProgramDescription {
+
+	@SuppressWarnings("serial")
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
+			@Override
+			public Long map(Long value) throws Exception {
+				return value;
+			}
+		}, env);
+
+		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
+				.run(new ConnectedComponentsAlgorithm(maxIterations)).getVertices();
+
+		// emit result
+		if (fileOutput) {
+			verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			verticesWithMinIds.print();
+		}
+
+		env.execute("Connected Components Example");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Connected Components Example";
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 3) {
+				System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+						"<num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+
+		} else {
+			System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+					"<num iterations>");
+		}
+
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+						@Override
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		} else {
+			return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
deleted file mode 100644
index a185a70..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponentsExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use the {@link org.apache.flink.graph.library.ConnectedComponents}
- * library method:
- * <ul>
- * 	<li> with the edge data set given as a parameter
- * 	<li> with default data
- * </ul>
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.ConnectedComponentsExampleData}
- */
-public class ConnectedComponentsExample implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String [] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
-			@Override
-			public Long map(Long value) throws Exception {
-				return value;
-			}
-		}, env);
-
-		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-				.run(new ConnectedComponents(maxIterations)).getVertices();
-
-		// emit result
-		if (fileOutput) {
-			verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-		} else {
-			verticesWithMinIds.print();
-		}
-
-		env.execute("Connected Components Example");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Connected Components Example";
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static Integer maxIterations = ConnectedComponentsExampleData.MAX_ITERATIONS;
-
-	private static boolean parseParameters(String [] args) {
-		if(args.length > 0) {
-			if(args.length != 3) {
-				System.err.println("Usage ConnectedComponents <edge path> <output path> " +
-						"<num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-
-		} else {
-			System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
-			System.out.println("Provide parameters to read input data from files.");
-			System.out.println("Usage ConnectedComponents <edge path> <output path> " +
-					"<num iterations>");
-		}
-
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		} else {
-			return ConnectedComponentsExampleData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
deleted file mode 100644
index fa08084..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphExample.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-
-import java.io.Serializable;
-
-/**
- * Given a directed, unweighted graph, with vertex values representing points in a plan,
- * return a weighted graph where the edge weights are equal to the Euclidean distance between the
- * src and the trg vertex values.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * 	<li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
- * 	the value being formed of two doubles separated by a comma.
- * 	For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
- * 	<li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
- * 	Edges themselves are separated by newlines.
- * 	For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
- * </ul>
- * </p>
- *
- * Usage <code>EuclideanGraphExample &lt;vertex path&gt; &lt;edge path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
- */
-@SuppressWarnings("serial")
-public class EuclideanGraphExample implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
-
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// the edge value will be the Euclidean distance between its src and trg vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
-				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
-
-					@Override
-					public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
-							throws Exception {
-
-						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
-						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
-
-						return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
-								srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
-					}
-				});
-
-		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
-				new MapFunction<Tuple2<Double, Double>, Double>() {
-
-					@Override
-					public Double map(Tuple2<Double, Double> distance) throws Exception {
-						return distance.f1;
-					}
-				});
-
-		// retrieve the edges from the final result
-		DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
-
-		// emit result
-		if (fileOutput) {
-			result.writeAsCsv(outputPath, "\n", ",");
-		} else {
-			result.print();
-		}
-
-		env.execute("Euclidean Graph Example");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Weighing a graph by computing the Euclidean distance " +
-				"between its vertices";
-	}
-
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-
-	/**
-	 * A simple two-dimensional point.
-	 */
-	public static class Point implements Serializable {
-
-		public double x, y;
-
-		public Point() {}
-
-		public Point(double x, double y) {
-			this.x = x;
-			this.y = y;
-		}
-
-		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
-		}
-
-		@Override
-		public String toString() {
-			return x + " " + y;
-		}
-	}
-
-	// ******************************************************************************************************************
-	// UTIL METHODS
-	// ******************************************************************************************************************
-
-	private static boolean fileOutput = false;
-
-	private static String verticesInputPath = null;
-
-	private static String edgesInputPath = null;
-
-	private static String outputPath = null;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if (args.length == 3) {
-				fileOutput = true;
-				verticesInputPath = args[0];
-				edgesInputPath = args[1];
-				outputPath = args[2];
-			} else {
-				System.out.println("Executing Euclidean Graph example with default parameters and built-in default data.");
-				System.out.println("Provide parameters to read input data from files.");
-				System.out.println("See the documentation for the correct format of input files.");
-				System.err.println("Usage: EuclideanGraphExample <input vertices path> <input edges path>" +
-						" <output path>");
-				return false;
-			}
-		}
-		return true;
-	}
-
-	private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(verticesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
-
-						@Override
-						public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
-							return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
-						}
-					});
-		} else {
-			return EuclideanGraphData.getDefaultVertexDataSet(env);
-		}
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
-
-						@Override
-						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
-							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
-						}
-					});
-		} else {
-			return EuclideanGraphData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
new file mode 100644
index 0000000..565ef69
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+
+import java.io.Serializable;
+
+/**
+ * Given a directed, unweighted graph, with vertex values representing points in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * 	<li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
+ * 	the value being formed of two doubles separated by a comma.
+ * 	For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
+ * 	<li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ * </p>
+ *
+ * Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphWeighing implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// the edge value will be the Euclidean distance between its src and trg vertex
+		DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
+				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
+
+					@Override
+					public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
+							throws Exception {
+
+						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
+						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
+
+						return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
+								srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+					}
+				});
+
+		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
+				new MapFunction<Tuple2<Double, Double>, Double>() {
+
+					@Override
+					public Double map(Tuple2<Double, Double> distance) throws Exception {
+						return distance.f1;
+					}
+				});
+
+		// retrieve the edges from the final result
+		DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			result.print();
+		}
+
+		env.execute("Euclidean Graph Weighing Example");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Weighing a graph by computing the Euclidean distance " +
+				"between its vertices";
+	}
+
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * A simple two-dimensional point.
+	 */
+	public static class Point implements Serializable {
+
+		public double x, y;
+
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+		}
+
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String verticesInputPath = null;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if (args.length == 3) {
+				fileOutput = true;
+				verticesInputPath = args[0];
+				edgesInputPath = args[1];
+				outputPath = args[2];
+			} else {
+				System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
+				System.out.println("Provide parameters to read input data from files.");
+				System.out.println("See the documentation for the correct format of input files.");
+				System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
+						" <output path>");
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(verticesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Double.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
+
+						@Override
+						public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
+							return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
+						}
+					});
+		} else {
+			return EuclideanGraphData.getDefaultVertexDataSet(env);
+		}
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
+						}
+					});
+		} else {
+			return EuclideanGraphData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
new file mode 100755
index 0000000..30855e4
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponents.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponents implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+		// Execute the GSA iteration
+		Graph<Long, Long, NullValue> result =
+				graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+						new UpdateComponentId(), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			connectedComponents.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			connectedComponents.print();
+		}
+
+		env.execute("GSA Connected Components");
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices	implements MapFunction<Long, Long> {
+
+		public Long map(Long vertexId) {
+			return vertexId;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+		public Long sum(Long newValue, Long currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+		public void apply(Long summedValue, Long origValue) {
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static int maxIterations = 16;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+
+			if (args.length != 3) {
+				System.err.println("Usage: GSAConnectedComponents <edge path> " +
+						"<result path> <max iterations>");
+				return false;
+			}
+
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+		} else {
+			System.out.println("Executing GSA Connected Components example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSAConnectedComponents <edge path> <result path> <max iterations>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		}
+
+		// Generates 3 components of size 2
+		return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+			@Override
+			public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+				out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+			}
+		});
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Connected Components";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
deleted file mode 100755
index 7c39123..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
- */
-public class GSAConnectedComponentsExample implements ProgramDescription {
-
-	// --------------------------------------------------------------------------------------------
-	//  Program
-	// --------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
-
-		// Execute the GSA iteration
-		Graph<Long, Long, NullValue> result =
-				graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
-						new UpdateComponentId(), maxIterations);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
-
-		// emit result
-		if (fileOutput) {
-			connectedComponents.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			connectedComponents.print();
-		}
-
-		env.execute("GSA Connected Components");
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitVertices	implements MapFunction<Long, Long> {
-
-		public Long map(Long vertexId) {
-			return vertexId;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Connected Components UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
-		public Long gather(Neighbor<Long, NullValue> neighbor) {
-			return neighbor.getNeighborValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
-		public Long sum(Long newValue, Long currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
-
-		public void apply(Long summedValue, Long origValue) {
-			if (summedValue < origValue) {
-				setResult(summedValue);
-			}
-		}
-	};
-
-	// --------------------------------------------------------------------------------------------
-	//  Util methods
-	// --------------------------------------------------------------------------------------------
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-
-	private static int maxIterations = 16;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-
-			if (args.length != 3) {
-				System.err.println("Usage: GSAConnectedComponentsExample <edge path> " +
-						"<result path> <max iterations>");
-				return false;
-			}
-
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-		} else {
-			System.out.println("Executing GSA Connected Components example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSAConnectedComponentsExample <edge path> <result path> <max iterations>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		}
-
-		// Generates 3 components of size 2
-		return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-			@Override
-			public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
-				out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
-			}
-		});
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Connected Components";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..bbc344f
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+
+		// Execute the GSA iteration
+		Graph<Long, Double, Double> result = graph
+				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+						new UpdateDistance(), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+		// emit result
+		if(fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+		env.execute("GSA Single Source Shortest Paths");
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
+
+		private long srcId;
+
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
+		}
+
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
+			}
+			else {
+				return Double.POSITIVE_INFINITY;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+		public Double sum(Double newValue, Double currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
+
+		public void apply(Double newDistance, Double oldDistance) {
+			if (newDistance < oldDistance) {
+				setResult(newDistance);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+				System.out.println("Executing GSASingle Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Single Source Shortest Paths";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
deleted file mode 100755
index 75cbd78..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
- */
-public class GSASingleSourceShortestPathsExample implements ProgramDescription {
-
-	// --------------------------------------------------------------------------------------------
-	//  Program
-	// --------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
-
-		// Execute the GSA iteration
-		Graph<Long, Double, Double> result = graph
-				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
-						new UpdateDistance(), maxIterations);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
-
-		// emit result
-		if(fileOutput) {
-			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			singleSourceShortestPaths.print();
-		}
-
-		env.execute("GSA Single Source Shortest Paths Example");
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitVertices implements MapFunction<Long, Double>{
-
-		private long srcId;
-
-		public InitVertices(long srcId) {
-			this.srcId = srcId;
-		}
-
-		public Double map(Long id) {
-			if (id.equals(srcId)) {
-				return 0.0;
-			}
-			else {
-				return Double.POSITIVE_INFINITY;
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Single Source Shortest Path UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
-
-		public Double gather(Neighbor<Double, Double> neighbor) {
-			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
-
-		public Double sum(Double newValue, Double currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
-
-		public void apply(Double newDistance, Double oldDistance) {
-			if (newDistance < oldDistance) {
-				setResult(newDistance);
-			}
-		}
-	};
-
-	// --------------------------------------------------------------------------------------------
-	//  Util methods
-	// --------------------------------------------------------------------------------------------
-
-	private static boolean fileOutput = false;
-
-	private static Long srcVertexId = 1l;
-
-	private static String edgesInputPath = null;
-
-	private static String outputPath = null;
-
-	private static int maxIterations = 5;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
-						" <input edges path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			srcVertexId = Long.parseLong(args[0]);
-			edgesInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-				System.out.println("Executing GSASingle Source Shortest Paths example "
-						+ "with default parameters and built-in default data.");
-				System.out.println("  Provide parameters to read input data from files.");
-				System.out.println("  See the documentation for the correct format of input files.");
-				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
-						" <input edges path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		} else {
-			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-		}
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Single Source Shortest Paths";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
new file mode 100644
index 0000000..dddaf41
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ * 	Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasure &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasure implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
+
+		DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
+				graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+
+		Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
+
+		// the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
+		DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
+				.map(new WeighEdgesMapper());
+
+		DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
+				new MapFunction<Tuple2<Double, Double>, Double>() {
+
+					@Override
+					public Double map(Tuple2<Double, Double> value) throws Exception {
+						return value.f1;
+					}
+				}).getEdges();
+
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			result.print();
+		}
+
+		env.execute("Executing Jaccard Similarity Measure");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex Jaccard Similarity Measure";
+	}
+
+	/**
+	 * Each vertex will have a HashSet containing its neighbor ids as value.
+	 */
+	private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
+														Collector<Vertex<Long, HashSet<Long>>> out) throws Exception {
+
+			HashSet<Long> neighborsHashSet = new HashSet<Long>();
+			long vertexId = -1;
+
+			for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
+				neighborsHashSet.add(getNeighborID(edge));
+				vertexId = edge.f0;
+			}
+			out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
+		}
+	}
+
+	/**
+	 * The edge weight will be the Jaccard coefficient, which is computed as follows:
+	 *
+	 * Consider the edge x-y
+	 * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
+	 * sizeX+sizeY = union + intersection of neighborhoods
+	 * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+	 * The intersection can then be deduced.
+	 *
+	 * The Jaccard similarity coefficient is then, the intersection/union.
+	 */
+	private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
+			Tuple3<Long, Long, Double>> {
+
+		@Override
+		public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
+				throws Exception {
+
+			Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
+			Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
+
+			long unionPlusIntersection = source.getValue().size() + target.getValue().size();
+			// within a HashSet, all elements are distinct
+			source.getValue().addAll(target.getValue());
+			// the source value contains the union
+			long union = source.getValue().size();
+			long intersection = unionPlusIntersection - union;
+
+			return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
+		}
+	}
+
+	/**
+	 * Helper method that extracts the neighborId given an edge.
+	 * @param edge
+	 * @return
+	 */
+	private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
+		if(edge.f1.getSource() == edge.f0) {
+			return edge.f1.getTarget();
+		} else {
+			return edge.f1.getSource();
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 2) {
+				System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+		} else {
+			System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
+		}
+
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
+						}
+					});
+		} else {
+			return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
deleted file mode 100644
index 2783a29..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.EdgesFunction;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
- * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
- * of the union of neighbor sets - for the src and target vertices.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <br>
- * 	Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
- * 	Edges themselves are separated by newlines.
- * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3.
- * </p>
- *
- * Usage <code> JaccardSimilarityMeasureExample &lt;edge path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
- */
-@SuppressWarnings("serial")
-public class JaccardSimilarityMeasureExample implements ProgramDescription {
-
-	public static void main(String [] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-		Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
-
-		DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
-				graph.groupReduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
-
-		Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
-
-		// the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
-		DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
-				.map(new WeighEdgesMapper());
-
-		DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
-				new MapFunction<Tuple2<Double, Double>, Double>() {
-
-					@Override
-					public Double map(Tuple2<Double, Double> value) throws Exception {
-						return value.f1;
-					}
-				}).getEdges();
-
-		// emit result
-		if (fileOutput) {
-			result.writeAsCsv(outputPath, "\n", ",");
-		} else {
-			result.print();
-		}
-
-		env.execute("Executing Jaccard Similarity Measure");
-	}
-
-	@Override
-	public String getDescription() {
-		return "Vertex Jaccard Similarity Measure";
-	}
-
-	/**
-	 * Each vertex will have a HashSet containing its neighbor ids as value.
-	 */
-	private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
-
-		@Override
-		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
-														Collector<Vertex<Long, HashSet<Long>>> out) throws Exception {
-
-			HashSet<Long> neighborsHashSet = new HashSet<Long>();
-			long vertexId = -1;
-
-			for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
-				neighborsHashSet.add(getNeighborID(edge));
-				vertexId = edge.f0;
-			}
-			out.collect(new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet));
-		}
-	}
-
-	/**
-	 * The edge weight will be the Jaccard coefficient, which is computed as follows:
-	 *
-	 * Consider the edge x-y
-	 * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
-	 * sizeX+sizeY = union + intersection of neighborhoods
-	 * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
-	 * The intersection can then be deduced.
-	 *
-	 * The Jaccard similarity coefficient is then, the intersection/union.
-	 */
-	private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
-			Tuple3<Long, Long, Double>> {
-
-		@Override
-		public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
-				throws Exception {
-
-			Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
-			Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
-
-			long unionPlusIntersection = source.getValue().size() + target.getValue().size();
-			// within a HashSet, all elements are distinct
-			source.getValue().addAll(target.getValue());
-			// the source value contains the union
-			long union = source.getValue().size();
-			long intersection = unionPlusIntersection - union;
-
-			return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
-		}
-	}
-
-	/**
-	 * Helper method that extracts the neighborId given an edge.
-	 * @param edge
-	 * @return
-	 */
-	private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
-		if(edge.f1.getSource() == edge.f0) {
-			return edge.f1.getTarget();
-		} else {
-			return edge.f1.getSource();
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-
-	private static boolean parseParameters(String [] args) {
-		if(args.length > 0) {
-			if(args.length != 2) {
-				System.err.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-		} else {
-			System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
-			System.out.println("Provide parameters to read input data from files.");
-			System.out.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
-		}
-
-		return true;
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
-						@Override
-						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
-							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
-						}
-					});
-		} else {
-			return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
new file mode 100644
index 0000000..4012a4e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagation.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.LabelPropagationAlgorithm;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This example uses the label propagation algorithm to detect communities by
+ * propagating labels. Initially, each vertex is assigned its id as its label.
+ * The vertices iteratively propagate their labels to their neighbors and adopt
+ * the most frequent label among their neighbors. The algorithm converges when
+ * no vertex changes value or the maximum number of iterations have been
+ * reached.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs
+ * in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * The vertices input file is expected to contain one vertex per line, with long IDs
+ * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ */
+public class LabelPropagation implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// Set up the execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// Set up the graph
+		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges,	env);
+
+		// Set up the program
+		DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
+				new LabelPropagationAlgorithm<Long>(maxIterations)).getVertices();
+
+		// Emit results
+		if(fileOutput) {
+			verticesWithCommunity.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			verticesWithCommunity.print();
+		}
+
+		// Execute the program
+		env.execute("Label Propagation Example");
+	}
+
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String vertexInputPath = null;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static long numVertices = 100;
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			vertexInputPath = args[0];
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+			System.out.println("Executing LabelPropagation example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: LabelPropagation <vertex path> <edge path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+
+		if (fileOutput) {
+			return env.readCsvFile(vertexInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new Tuple2ToVertexMap<Long, Long>());
+		}
+
+		return env.generateSequence(1, numVertices).map(
+				new MapFunction<Long, Vertex<Long, Long>>() {
+					public Vertex<Long, Long> map(Long l) throws Exception {
+						return new Vertex<Long, Long>(l, l);
+					}
+				});
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+						@Override
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		}
+
+		return env.generateSequence(1, numVertices).flatMap(
+				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+					@Override
+					public void flatMap(Long key,
+							Collector<Edge<Long, NullValue>> out) {
+						int numOutEdges = (int) (Math.random() * (numVertices / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numVertices) + 1;
+							out.collect(new Edge<Long, NullValue>(key, target,
+									NullValue.getInstance()));
+						}
+					}
+				});
+	}
+
+	@Override
+	public String getDescription() {
+		return "Label Propagation Example";
+	}
+}
\ No newline at end of file