You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/08/22 21:39:34 UTC

[3/4] flink git commit: [FLINK-2451] [gelly] re-organized tests; compare with collect() instead of temp files where possible

[FLINK-2451] [gelly] re-organized tests; compare with collect() instead of temp files where possible

This closes #1000


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f35988f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f35988f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f35988f

Branch: refs/heads/master
Commit: 8f35988fc3edfeda8044a6675d04f111596b8e31
Parents: d2d061c
Author: vasia <va...@apache.org>
Authored: Fri Aug 7 11:28:20 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Aug 22 20:46:49 2015 +0200

----------------------------------------------------------------------
 .../example/GSASingleSourceShortestPaths.java   |   2 +-
 .../example/utils/CommunityDetectionData.java   |  30 ++++
 .../example/utils/LabelPropagationData.java     | 110 ++++++++++++++
 .../flink/graph/example/utils/PageRankData.java |  33 ++++-
 .../flink/graph/test/GatherSumApplyITCase.java  | 111 +++++++-------
 .../test/example/CommunityDetectionITCase.java  | 100 -------------
 ...ctedComponentsWithRandomisedEdgesITCase.java |  95 ------------
 .../test/example/LabelPropagationITCase.java    | 143 -------------------
 .../graph/test/example/PageRankITCase.java      |  78 ----------
 .../SingleSourceShortestPathsITCase.java        |   9 ++
 .../test/library/CommunityDetectionITCase.java  |  82 +++++++++++
 ...ctedComponentsWithRandomisedEdgesITCase.java |  95 ++++++++++++
 .../test/library/LabelPropagationITCase.java    |  78 ++++++++++
 .../graph/test/library/PageRankITCase.java      | 125 ++++++++++++++++
 14 files changed, 611 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index 23a3a82..9ea8fe2 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -74,7 +74,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 
 		// emit result
 		if(fileOutput) {
-			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
 
 			// since file sinks are lazy, we trigger the execution explicitly
 			env.execute("GSA Single Source Shortest Paths");

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
index 629f5ef..196de3a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -36,6 +36,11 @@ public class CommunityDetectionData {
 
 	public static final double DELTA = 0.5f;
 
+	public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + "2,6\n"
+			+ "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7"; 
+
+	public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+
 	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
 
 		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
@@ -61,5 +66,30 @@ public class CommunityDetectionData {
 		return env.fromCollection(edges);
 	}
 
+	public static DataSet<Edge<Long, Double>> getSimpleEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(1L, 5L, 4.0));
+		edges.add(new Edge<Long, Double>(2L, 6L, 5.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 6.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 7.0));
+		edges.add(new Edge<Long, Double>(7L, 8L, 8.0));
+
+		return env.fromCollection(edges);
+	}
+
 	private CommunityDetectionData() {}
+
+	public static DataSet<Edge<Long, Double>> getTieEdgeDataSet(ExecutionEnvironment env) {
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
new file mode 100644
index 0000000..b70a9c4
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+public class LabelPropagationData {
+	
+	public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,40\n" +
+			"5,40\n" +
+			"6,40\n" +
+			"7,40\n";
+
+	public static final String LABELS_WITH_TIE ="1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,10\n" +
+			"5,20\n" +
+			"6,20\n" +
+			"7,20\n" +
+			"8,20\n" +
+			"9,20\n";
+
+	private LabelPropagationData() {}
+
+	public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1l, 10l));
+		vertices.add(new Vertex<Long, Long>(2l, 10l));
+		vertices.add(new Vertex<Long, Long>(3l, 30l));
+		vertices.add(new Vertex<Long, Long>(4l, 40l));
+		vertices.add(new Vertex<Long, Long>(5l, 40l));
+		vertices.add(new Vertex<Long, Long>(6l, 40l));
+		vertices.add(new Vertex<Long, Long>(7l, 40l));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1l, 10l));
+		vertices.add(new Vertex<Long, Long>(2l, 10l));
+		vertices.add(new Vertex<Long, Long>(3l, 10l));
+		vertices.add(new Vertex<Long, Long>(4l, 10l));
+		vertices.add(new Vertex<Long, Long>(5l, 0l));
+		vertices.add(new Vertex<Long, Long>(6l, 20l));
+		vertices.add(new Vertex<Long, Long>(7l, 20l));
+		vertices.add(new Vertex<Long, Long>(8l, 20l));
+		vertices.add(new Vertex<Long, Long>(9l, 20l));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
index 077572e..c84808a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.graph.example.utils;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
 public class PageRankData {
 	
 	public static final String EDGES = "2	1\n" +
@@ -31,12 +38,28 @@ public class PageRankData {
 										"3	5\n";
 
 	
-	public static final String RANKS_AFTER_3_ITERATIONS = "1	0.237\n" +
-														"2	0.248\n" + 
-														"3	0.173\n" +
-														"4	0.175\n" +
-														"5	0.165\n";
+	public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
+														"2,0.248\n" + 
+														"3,0.173\n" +
+														"4,0.175\n" +
+														"5,0.165\n";
 
 	private PageRankData() {}
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
+		edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index a883fa0..4b381b6 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.graph.test;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAConnectedComponents;
-import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.flink.types.NullValue;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.File;
+import java.util.List;
 
 @RunWith(Parameterized.class)
 public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@@ -40,44 +41,29 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String edgesPath;
-	private String resultPath;
 	private String expectedResult;
 
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
-
-		edgesPath = edgesFile.toURI().toString();
-
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Connected Components Test
 	// --------------------------------------------------------------------------------------------
 
 	@Test
 	public void testConnectedComponents() throws Exception {
-		GSAConnectedComponents.main(new String[]{edgesPath, resultPath, "16"});
-		expectedResult = "1 1\n" +
-				"2 1\n" +
-				"3 1\n" +
-				"4 1\n" +
-				"5 1\n" +
-				"6 6\n" +
-				"7 6\n";
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+				new InitMapperCC(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new GSAConnectedComponents(16))
+        		.getVertices().collect();
 
+		expectedResult = "1,1\n" +
+				"2,1\n" +
+				"3,1\n" +
+				"4,1\n";
+
+		compareResultAsTuples(result, expectedResult);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -86,26 +72,35 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testSingleSourceShortestPaths() throws Exception {
-		GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"});
-		expectedResult = "1 0.0\n" +
-				"2 12.0\n" +
-				"3 13.0\n" +
-				"4 47.0\n" +
-				"5 48.0\n" +
-				"6 Infinity\n" +
-				"7 Infinity\n";
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+				new InitMapperSSSP(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSASingleSourceShortestPaths<Long>(1l, 16))
+        		.getVertices().collect();
+
+		expectedResult = "1,0.0\n" +
+				"2,12.0\n" +
+				"3,13.0\n" +
+				"4,47.0\n" +
+				"5,48.0\n";
+
+		compareResultAsTuples(result, expectedResult);
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Sample data
-	// --------------------------------------------------------------------------------------------
+	@SuppressWarnings("serial")
+	private static final class InitMapperCC implements MapFunction<Long, Long> {
+		public Long map(Long value) {
+			return value;
+		}
+	}
 
-	private static final String EDGES = "1	2	12.0\n" +
-			"1	3	13.0\n" +
-			"2	3	23.0\n" +
-			"3	4	34.0\n" +
-			"3	5	35.0\n" +
-			"4	5	45.0\n" +
-			"5	1	51.0\n" +
-			"6	7	67.0\n";
-}
+	@SuppressWarnings("serial")
+	private static final class InitMapperSSSP implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.0;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
deleted file mode 100644
index 1302424..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.CommunityDetection;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class CommunityDetectionITCase extends MultipleProgramsTestBase {
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public CommunityDetectionITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of the Simple Community Detection Example
-		 */
-		final String edges = "1	2	1.0\n" + "1	3	2.0\n" + "1	4	3.0\n" + "1	5	4.0\n" + "2	6	5.0\n" +
-				"6	7	6.0\n" + "6	8	7.0\n" + "7	8	8.0";
-		edgesPath = createTempFile(edges);
-
-		CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
-				CommunityDetectionData.DELTA + ""});
-
-		expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
-		 */
-
-		final String edges = "1	2	1.0\n" + "1	3	1.0\n" + "1	4	1.0\n" + "1	5	1.0";
-		edgesPath = createTempFile(edges);
-
-		CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
-				CommunityDetectionData.DELTA + ""});
-
-		expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
-	}
-
-
-	// -------------------------------------------------------------------------
-	// Util methods
-	// -------------------------------------------------------------------------
-	private String createTempFile(final String rows) throws Exception {
-		File tempFile = tempFolder.newFile();
-		Files.write(rows, tempFile, Charsets.UTF_8);
-		return tempFile.toURI().toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
deleted file mode 100644
index f2f3d8c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-
-import java.io.BufferedReader;
-
-@SuppressWarnings("serial")
-public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
-
-	private static final long SEED = 9487520347802987L;
-
-	private static final int NUM_VERTICES = 1000;
-
-	private static final int NUM_EDGES = 10000;
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempFilePath("results");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
-		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
-
-		DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
-
-		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
-		DataSet<Vertex<Long, Long>> result = graph
-				.run(new ConnectedComponentsAlgorithm(100)).getVertices();
-
-		result.writeAsCsv(resultPath, "\n", " ");
-		env.execute();
-	}
-
-	/**
-	 * A map function that takes a Long value and creates a 2-tuple out of it:
-	 * <pre>(Long value) -> (value, value)</pre>
-	 */
-	public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
-		@Override
-		public Vertex<Long, Long> map(Long value) {
-			return new Vertex<Long, Long>(value, value);
-		}
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		for (BufferedReader reader : getResultReader(resultPath)) {
-			ConnectedComponentsData.checkOddEvenResult(reader);
-		}
-	}
-
-	public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
-		public Edge<Long, NullValue> map(String value) {
-			String[] nums = value.split(" ");
-			return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
-					NullValue.getInstance());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
deleted file mode 100644
index 858d06c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.LabelPropagation;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationITCase extends MultipleProgramsTestBase {
-
-	public LabelPropagationITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of label propagation example with a simple graph
-		 */
-
-		final String vertices = "1	10\n" +
-				"2	10\n" +
-				"3	30\n" +
-				"4	40\n" +
-				"5	40\n" +
-				"6	40\n" +
-				"7	70\n";
-
-		final String edges = "1	3\n" +
-				"2	3\n" +
-				"4	7\n" +
-				"5	7\n" +
-				"6	7\n" +
-				"7	3\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
-
-		expectedResult = "1,10\n" +
-			"2,10\n" +
-			"3,10\n" +
-			"4,40\n" +
-			"5,40\n" +
-			"6,40\n" +
-			"7,40\n";
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test the label propagation example where a tie must be broken
-		 */
-
-		final String vertices = "1	10\n" +
-				"2	10\n" +
-				"3	10\n" +
-				"4	10\n" +
-				"5	0\n" +
-				"6	20\n" +
-				"7	20\n" +
-				"8	20\n" +
-				"9	20\n";
-
-		final String edges = "1	5\n" +
-				"2	5\n" +
-				"3	5\n" +
-				"4	5\n" +
-				"6	5\n" +
-				"7	5\n" +
-				"8	5\n" +
-				"9	5\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
-
-		expectedResult = "1,10\n" +
-				"2,10\n" +
-				"3,10\n" +
-				"4,10\n" +
-				"5,20\n" +
-				"6,20\n" +
-				"7,20\n" +
-				"8,20\n" +
-				"9,20\n";
-	}
-
-	// -------------------------------------------------------------------------
-	//  Util methods
-	// -------------------------------------------------------------------------
-
-	private String createTempFile(final String rows) throws Exception {
-		File tempFile = tempFolder.newFile();
-		Files.write(rows, tempFile, Charsets.UTF_8);
-		return tempFile.toURI().toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
deleted file mode 100644
index cde959f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import java.io.File;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.GSAPageRank;
-import org.apache.flink.graph.example.PageRank;
-import org.apache.flink.graph.example.utils.PageRankData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
-	public PageRankITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	private String edgesPath;
-	private String resultPath;
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
-
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareKeyValuePairsWithDelta(expected, resultPath, "\t", 0.01);
-	}
-
-	@Test
-	public void testPageRankWithThreeIterations() throws Exception {
-		PageRank.main(new String[] {edgesPath, resultPath, "3"});
-		expected =  PageRankData.RANKS_AFTER_3_ITERATIONS;
-	}
-
-	@Test
-	public void testGSAPageRankWithThreeIterations() throws Exception {
-		GSAPageRank.main(new String[] {edgesPath, resultPath, "3"});
-		expected =  PageRankData.RANKS_AFTER_3_ITERATIONS;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
index 2e68b0a..d8f8c8f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -20,6 +20,8 @@ package org.apache.flink.graph.test.example;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
 import org.apache.flink.graph.example.SingleSourceShortestPaths;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -65,6 +67,13 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
         expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
     }
 
+    @Test
+    public void testGSASSSPExample() throws Exception {
+        GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
     @After
     public void after() throws Exception {
         compareResultsByLinesInMemory(expected, resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
new file mode 100644
index 0000000..104996e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetection;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+	public CommunityDetectionITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String expected;
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example
+		 */
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+				CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+        		.getVertices().collect();
+
+		expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+				CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+        		.getVertices().collect();
+		expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
+		compareResultAsTuples(result, expected);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitLabels implements MapFunction<Long, Long>{
+
+		public Long map(Long id) {
+			return id;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
new file mode 100644
index 0000000..ef4b467
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+import java.io.BufferedReader;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 9487520347802987L;
+
+	private static final int NUM_VERTICES = 1000;
+
+	private static final int NUM_EDGES = 10000;
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+
+		DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
+
+		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+		DataSet<Vertex<Long, Long>> result = graph
+				.run(new ConnectedComponents(100)).getVertices();
+
+		result.writeAsCsv(resultPath, "\n", " ");
+		env.execute();
+	}
+
+	/**
+	 * A map function that takes a Long value and creates a 2-tuple out of it:
+	 * <pre>(Long value) -> (value, value)</pre>
+	 */
+	public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
+		@Override
+		public Vertex<Long, Long> map(Long value) {
+			return new Vertex<Long, Long>(value, value);
+		}
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+
+	public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
+		public Edge<Long, NullValue> map(String value) {
+			String[] nums = value.split(" ");
+			return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
+					NullValue.getInstance());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
new file mode 100644
index 0000000..da36ef6
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.LabelPropagationData;
+import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+	public LabelPropagationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of label propagation example with a simple graph
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				LabelPropagationData.getDefaultVertexSet(env),
+				LabelPropagationData.getDefaultEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
+        		.getVertices().collect();
+
+		expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test the label propagation example where a tie must be broken
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				LabelPropagationData.getTieVertexSet(env),
+				LabelPropagationData.getTieEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
+        		.getVertices().collect();
+
+		expectedResult = LabelPropagationData.LABELS_WITH_TIE;
+		compareResultAsTuples(result, expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
new file mode 100644
index 0000000..cc1132d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.PageRankData;
+import org.apache.flink.graph.library.GSAPageRank;
+import org.apache.flink.graph.library.PageRank;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	@Test
+	public void testPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+		
+		DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
+
+		Graph<Long, Double, Double> networkWithWeights = inputGraph
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+        List<Vertex<Long, Double>> result = networkWithWeights.run(new PageRank<Long>(0.85, 3))
+        		.getVertices().collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+		
+		DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
+
+		Graph<Long, Double, Double> networkWithWeights = inputGraph
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+        List<Vertex<Long, Double>> result = networkWithWeights.run(new GSAPageRank<Long>(0.85, 3))
+        		.getVertices().collect();
+        
+        compareWithDelta(result, expectedResult, 0.01);
+	}
+
+	private void compareWithDelta(List<Vertex<Long, Double>> result,
+			String expectedResult, double delta) {
+
+		String resultString = "";
+        for (Vertex<Long, Double> v : result) {
+        	resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
+        }
+        
+		expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
+
+		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
+
+		Arrays.sort(expected);
+        Arrays.sort(resultArray);
+
+		for (int i = 0; i < expected.length; i++) {
+			String[] expectedFields = expected[i].split(",");
+			String[] resultFields = resultArray[i].split(",");
+
+			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+			double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+			Assert.assertTrue("Values differ by more than the permissible delta",
+					Math.abs(expectedPayLoad - resultPayLoad) < delta);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 1.0;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+		public Double map(Tuple2<Double, Long> value) {
+			return value.f0 / value.f1;
+		}
+	}
+}