You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/08/22 21:39:34 UTC
[3/4] flink git commit: [FLINK-2451] [gelly] re-organized tests;
compare with collect() instead of temp files where possible
[FLINK-2451] [gelly] re-organized tests; compare with collect() instead of temp files where possible
This closes #1000
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f35988f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f35988f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f35988f
Branch: refs/heads/master
Commit: 8f35988fc3edfeda8044a6675d04f111596b8e31
Parents: d2d061c
Author: vasia <va...@apache.org>
Authored: Fri Aug 7 11:28:20 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Aug 22 20:46:49 2015 +0200
----------------------------------------------------------------------
.../example/GSASingleSourceShortestPaths.java | 2 +-
.../example/utils/CommunityDetectionData.java | 30 ++++
.../example/utils/LabelPropagationData.java | 110 ++++++++++++++
.../flink/graph/example/utils/PageRankData.java | 33 ++++-
.../flink/graph/test/GatherSumApplyITCase.java | 111 +++++++-------
.../test/example/CommunityDetectionITCase.java | 100 -------------
...ctedComponentsWithRandomisedEdgesITCase.java | 95 ------------
.../test/example/LabelPropagationITCase.java | 143 -------------------
.../graph/test/example/PageRankITCase.java | 78 ----------
.../SingleSourceShortestPathsITCase.java | 9 ++
.../test/library/CommunityDetectionITCase.java | 82 +++++++++++
...ctedComponentsWithRandomisedEdgesITCase.java | 95 ++++++++++++
.../test/library/LabelPropagationITCase.java | 78 ++++++++++
.../graph/test/library/PageRankITCase.java | 125 ++++++++++++++++
14 files changed, 611 insertions(+), 480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index 23a3a82..9ea8fe2 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -74,7 +74,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
// emit result
if(fileOutput) {
- singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("GSA Single Source Shortest Paths");
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
index 629f5ef..196de3a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -36,6 +36,11 @@ public class CommunityDetectionData {
public static final double DELTA = 0.5f;
+ public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + "2,6\n"
+ + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
+
+ public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
@@ -61,5 +66,30 @@ public class CommunityDetectionData {
return env.fromCollection(edges);
}
+ public static DataSet<Edge<Long, Double>> getSimpleEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+ edges.add(new Edge<Long, Double>(1L, 5L, 4.0));
+ edges.add(new Edge<Long, Double>(2L, 6L, 5.0));
+ edges.add(new Edge<Long, Double>(6L, 7L, 6.0));
+ edges.add(new Edge<Long, Double>(6L, 8L, 7.0));
+ edges.add(new Edge<Long, Double>(7L, 8L, 8.0));
+
+ return env.fromCollection(edges);
+ }
+
private CommunityDetectionData() {}
+
+ public static DataSet<Edge<Long, Double>> getTieEdgeDataSet(ExecutionEnvironment env) {
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 5L, 1.0));
+
+ return env.fromCollection(edges);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
new file mode 100644
index 0000000..b70a9c4
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+public class LabelPropagationData {
+
+ public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,40\n" +
+ "5,40\n" +
+ "6,40\n" +
+ "7,40\n";
+
+ public static final String LABELS_WITH_TIE ="1,10\n" +
+ "2,10\n" +
+ "3,10\n" +
+ "4,10\n" +
+ "5,20\n" +
+ "6,20\n" +
+ "7,20\n" +
+ "8,20\n" +
+ "9,20\n";
+
+ private LabelPropagationData() {}
+
+ public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ vertices.add(new Vertex<Long, Long>(1l, 10l));
+ vertices.add(new Vertex<Long, Long>(2l, 10l));
+ vertices.add(new Vertex<Long, Long>(3l, 30l));
+ vertices.add(new Vertex<Long, Long>(4l, 40l));
+ vertices.add(new Vertex<Long, Long>(5l, 40l));
+ vertices.add(new Vertex<Long, Long>(6l, 40l));
+ vertices.add(new Vertex<Long, Long>(7l, 40l));
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+ edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance()));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) {
+
+ List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+ vertices.add(new Vertex<Long, Long>(1l, 10l));
+ vertices.add(new Vertex<Long, Long>(2l, 10l));
+ vertices.add(new Vertex<Long, Long>(3l, 10l));
+ vertices.add(new Vertex<Long, Long>(4l, 10l));
+ vertices.add(new Vertex<Long, Long>(5l, 0l));
+ vertices.add(new Vertex<Long, Long>(6l, 20l));
+ vertices.add(new Vertex<Long, Long>(7l, 20l));
+ vertices.add(new Vertex<Long, Long>(8l, 20l));
+ vertices.add(new Vertex<Long, Long>(9l, 20l));
+
+ return env.fromCollection(vertices);
+ }
+
+ public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+ edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance()));
+
+ return env.fromCollection(edges);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
index 077572e..c84808a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -18,6 +18,13 @@
package org.apache.flink.graph.example.utils;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
public class PageRankData {
public static final String EDGES = "2 1\n" +
@@ -31,12 +38,28 @@ public class PageRankData {
"3 5\n";
- public static final String RANKS_AFTER_3_ITERATIONS = "1 0.237\n" +
- "2 0.248\n" +
- "3 0.173\n" +
- "4 0.175\n" +
- "5 0.165\n";
+ public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
+ "2,0.248\n" +
+ "3,0.173\n" +
+ "4,0.175\n" +
+ "5,0.165\n";
private PageRankData() {}
+
+ public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
+ edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
+ edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
+ edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+ edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
+
+ return env.fromCollection(edges);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index a883fa0..4b381b6 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -18,20 +18,21 @@
package org.apache.flink.graph.test;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAConnectedComponents;
-import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.flink.types.NullValue;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.File;
+import java.util.List;
@RunWith(Parameterized.class)
public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@@ -40,44 +41,29 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String edgesPath;
- private String resultPath;
private String expectedResult;
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
-
- File edgesFile = tempFolder.newFile();
- Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
-
- edgesPath = edgesFile.toURI().toString();
-
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
// --------------------------------------------------------------------------------------------
// Connected Components Test
// --------------------------------------------------------------------------------------------
@Test
public void testConnectedComponents() throws Exception {
- GSAConnectedComponents.main(new String[]{edgesPath, resultPath, "16"});
- expectedResult = "1 1\n" +
- "2 1\n" +
- "3 1\n" +
- "4 1\n" +
- "5 1\n" +
- "6 6\n" +
- "7 6\n";
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+ new InitMapperCC(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new GSAConnectedComponents(16))
+ .getVertices().collect();
+ expectedResult = "1,1\n" +
+ "2,1\n" +
+ "3,1\n" +
+ "4,1\n";
+
+ compareResultAsTuples(result, expectedResult);
}
// --------------------------------------------------------------------------------------------
@@ -86,26 +72,35 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@Test
public void testSingleSourceShortestPaths() throws Exception {
- GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"});
- expectedResult = "1 0.0\n" +
- "2 12.0\n" +
- "3 13.0\n" +
- "4 47.0\n" +
- "5 48.0\n" +
- "6 Infinity\n" +
- "7 Infinity\n";
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+ new InitMapperSSSP(), env);
+
+ List<Vertex<Long, Double>> result = inputGraph.run(new GSASingleSourceShortestPaths<Long>(1l, 16))
+ .getVertices().collect();
+
+ expectedResult = "1,0.0\n" +
+ "2,12.0\n" +
+ "3,13.0\n" +
+ "4,47.0\n" +
+ "5,48.0\n";
+
+ compareResultAsTuples(result, expectedResult);
}
- // --------------------------------------------------------------------------------------------
- // Sample data
- // --------------------------------------------------------------------------------------------
+ @SuppressWarnings("serial")
+ private static final class InitMapperCC implements MapFunction<Long, Long> {
+ public Long map(Long value) {
+ return value;
+ }
+ }
- private static final String EDGES = "1 2 12.0\n" +
- "1 3 13.0\n" +
- "2 3 23.0\n" +
- "3 4 34.0\n" +
- "3 5 35.0\n" +
- "4 5 45.0\n" +
- "5 1 51.0\n" +
- "6 7 67.0\n";
-}
+ @SuppressWarnings("serial")
+ private static final class InitMapperSSSP implements MapFunction<Long, Double> {
+ public Double map(Long value) {
+ return 0.0;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
deleted file mode 100644
index 1302424..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.CommunityDetection;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class CommunityDetectionITCase extends MultipleProgramsTestBase {
-
- private String edgesPath;
-
- private String resultPath;
-
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- public CommunityDetectionITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
-
- @Test
- public void testSingleIteration() throws Exception {
- /*
- * Test one iteration of the Simple Community Detection Example
- */
- final String edges = "1 2 1.0\n" + "1 3 2.0\n" + "1 4 3.0\n" + "1 5 4.0\n" + "2 6 5.0\n" +
- "6 7 6.0\n" + "6 8 7.0\n" + "7 8 8.0";
- edgesPath = createTempFile(edges);
-
- CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
- CommunityDetectionData.DELTA + ""});
-
- expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
- }
-
- @Test
- public void testTieBreaker() throws Exception {
- /*
- * Test one iteration of the Simple Community Detection Example where a tie must be broken
- */
-
- final String edges = "1 2 1.0\n" + "1 3 1.0\n" + "1 4 1.0\n" + "1 5 1.0";
- edgesPath = createTempFile(edges);
-
- CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
- CommunityDetectionData.DELTA + ""});
-
- expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
- }
-
-
- // -------------------------------------------------------------------------
- // Util methods
- // -------------------------------------------------------------------------
- private String createTempFile(final String rows) throws Exception {
- File tempFile = tempFolder.newFile();
- Files.write(rows, tempFile, Charsets.UTF_8);
- return tempFile.toURI().toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
deleted file mode 100644
index f2f3d8c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-
-import java.io.BufferedReader;
-
-@SuppressWarnings("serial")
-public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
-
- private static final long SEED = 9487520347802987L;
-
- private static final int NUM_VERTICES = 1000;
-
- private static final int NUM_EDGES = 10000;
-
- private String resultPath;
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempFilePath("results");
- }
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
- DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
-
- DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
-
- DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
- DataSet<Vertex<Long, Long>> result = graph
- .run(new ConnectedComponentsAlgorithm(100)).getVertices();
-
- result.writeAsCsv(resultPath, "\n", " ");
- env.execute();
- }
-
- /**
- * A map function that takes a Long value and creates a 2-tuple out of it:
- * <pre>(Long value) -> (value, value)</pre>
- */
- public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
- @Override
- public Vertex<Long, Long> map(Long value) {
- return new Vertex<Long, Long>(value, value);
- }
- }
-
- @Override
- protected void postSubmit() throws Exception {
- for (BufferedReader reader : getResultReader(resultPath)) {
- ConnectedComponentsData.checkOddEvenResult(reader);
- }
- }
-
- public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
- public Edge<Long, NullValue> map(String value) {
- String[] nums = value.split(" ");
- return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
- NullValue.getInstance());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
deleted file mode 100644
index 858d06c..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.LabelPropagation;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationITCase extends MultipleProgramsTestBase {
-
- public LabelPropagationITCase(TestExecutionMode mode){
- super(mode);
- }
-
- private String resultPath;
- private String expectedResult;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expectedResult, resultPath);
- }
-
- @Test
- public void testSingleIteration() throws Exception {
- /*
- * Test one iteration of label propagation example with a simple graph
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 30\n" +
- "4 40\n" +
- "5 40\n" +
- "6 40\n" +
- "7 70\n";
-
- final String edges = "1 3\n" +
- "2 3\n" +
- "4 7\n" +
- "5 7\n" +
- "6 7\n" +
- "7 3\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,40\n" +
- "5,40\n" +
- "6,40\n" +
- "7,40\n";
- }
-
- @Test
- public void testTieBreaker() throws Exception {
- /*
- * Test the label propagation example where a tie must be broken
- */
-
- final String vertices = "1 10\n" +
- "2 10\n" +
- "3 10\n" +
- "4 10\n" +
- "5 0\n" +
- "6 20\n" +
- "7 20\n" +
- "8 20\n" +
- "9 20\n";
-
- final String edges = "1 5\n" +
- "2 5\n" +
- "3 5\n" +
- "4 5\n" +
- "6 5\n" +
- "7 5\n" +
- "8 5\n" +
- "9 5\n";
-
- String verticesPath = createTempFile(vertices);
- String edgesPath = createTempFile(edges);
-
- LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
-
- expectedResult = "1,10\n" +
- "2,10\n" +
- "3,10\n" +
- "4,10\n" +
- "5,20\n" +
- "6,20\n" +
- "7,20\n" +
- "8,20\n" +
- "9,20\n";
- }
-
- // -------------------------------------------------------------------------
- // Util methods
- // -------------------------------------------------------------------------
-
- private String createTempFile(final String rows) throws Exception {
- File tempFile = tempFolder.newFile();
- Files.write(rows, tempFile, Charsets.UTF_8);
- return tempFile.toURI().toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
deleted file mode 100644
index cde959f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import java.io.File;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.GSAPageRank;
-import org.apache.flink.graph.example.PageRank;
-import org.apache.flink.graph.example.utils.PageRankData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
- public PageRankITCase(TestExecutionMode mode){
- super(mode);
- }
-
- private String edgesPath;
- private String resultPath;
- private String expected;
-
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Before
- public void before() throws Exception{
- resultPath = tempFolder.newFile().toURI().toString();
-
- File edgesFile = tempFolder.newFile();
- Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
-
- edgesPath = edgesFile.toURI().toString();
- }
-
- @After
- public void after() throws Exception{
- compareKeyValuePairsWithDelta(expected, resultPath, "\t", 0.01);
- }
-
- @Test
- public void testPageRankWithThreeIterations() throws Exception {
- PageRank.main(new String[] {edgesPath, resultPath, "3"});
- expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
- }
-
- @Test
- public void testGSAPageRankWithThreeIterations() throws Exception {
- GSAPageRank.main(new String[] {edgesPath, resultPath, "3"});
- expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
index 2e68b0a..d8f8c8f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -20,6 +20,8 @@ package org.apache.flink.graph.test.example;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
import org.apache.flink.graph.example.SingleSourceShortestPaths;
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -65,6 +67,13 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
}
+ @Test
+ public void testGSASSSPExample() throws Exception {
+ GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+ edgesPath, resultPath, 10 + ""});
+ expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+ }
+
@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected, resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
new file mode 100644
index 0000000..104996e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.graph.library.CommunityDetection;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+ public CommunityDetectionITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private String expected;
+
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of the Simple Community Detection Example
+ */
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+ CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+ .getVertices().collect();
+
+ expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
+ compareResultAsTuples(result, expected);
+ }
+
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test one iteration of the Simple Community Detection Example where a tie must be broken
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+ CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+ .getVertices().collect();
+ expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
+ compareResultAsTuples(result, expected);
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitLabels implements MapFunction<Long, Long>{
+
+ public Long map(Long id) {
+ return id;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
new file mode 100644
index 0000000..ef4b467
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+import java.io.BufferedReader;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
+
+ private static final long SEED = 9487520347802987L;
+
+ private static final int NUM_VERTICES = 1000;
+
+ private static final int NUM_EDGES = 10000;
+
+ private String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempFilePath("results");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+ DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+
+ DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
+
+ DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+ DataSet<Vertex<Long, Long>> result = graph
+ .run(new ConnectedComponents(100)).getVertices();
+
+ result.writeAsCsv(resultPath, "\n", " ");
+ env.execute();
+ }
+
+ /**
+ * A map function that takes a Long value and creates a 2-tuple out of it:
+ * <pre>(Long value) -> (value, value)</pre>
+ */
+ public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
+ @Override
+ public Vertex<Long, Long> map(Long value) {
+ return new Vertex<Long, Long>(value, value);
+ }
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ for (BufferedReader reader : getResultReader(resultPath)) {
+ ConnectedComponentsData.checkOddEvenResult(reader);
+ }
+ }
+
+ public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
+ public Edge<Long, NullValue> map(String value) {
+ String[] nums = value.split(" ");
+ return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
+ NullValue.getInstance());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
new file mode 100644
index 0000000..da36ef6
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.LabelPropagationData;
+import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+ public LabelPropagationITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of label propagation example with a simple graph
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ LabelPropagationData.getDefaultVertexSet(env),
+ LabelPropagationData.getDefaultEdgeDataSet(env), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
+ .getVertices().collect();
+
+ expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
+ compareResultAsTuples(result, expectedResult);
+ }
+
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test the label propagation example where a tie must be broken
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+ LabelPropagationData.getTieVertexSet(env),
+ LabelPropagationData.getTieEdgeDataSet(env), env);
+
+ List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
+ .getVertices().collect();
+
+ expectedResult = LabelPropagationData.LABELS_WITH_TIE;
+ compareResultAsTuples(result, expectedResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f35988f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
new file mode 100644
index 0000000..cc1132d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.PageRankData;
+import org.apache.flink.graph.library.GSAPageRank;
+import org.apache.flink.graph.library.PageRank;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+ public PageRankITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String expectedResult;
+
+ @Test
+ public void testPageRankWithThreeIterations() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+ DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
+
+ Graph<Long, Double, Double> networkWithWeights = inputGraph
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ List<Vertex<Long, Double>> result = networkWithWeights.run(new PageRank<Long>(0.85, 3))
+ .getVertices().collect();
+
+ compareWithDelta(result, expectedResult, 0.01);
+ }
+
+ @Test
+ public void testGSAPageRankWithThreeIterations() throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+ PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+ DataSet<Tuple2<Long, Long>> vertexOutDegrees = inputGraph.outDegrees();
+
+ Graph<Long, Double, Double> networkWithWeights = inputGraph
+ .joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+ List<Vertex<Long, Double>> result = networkWithWeights.run(new GSAPageRank<Long>(0.85, 3))
+ .getVertices().collect();
+
+ compareWithDelta(result, expectedResult, 0.01);
+ }
+
+ private void compareWithDelta(List<Vertex<Long, Double>> result,
+ String expectedResult, double delta) {
+
+ String resultString = "";
+ for (Vertex<Long, Double> v : result) {
+ resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
+ }
+
+ expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+ String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
+
+ String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
+
+ Arrays.sort(expected);
+ Arrays.sort(resultArray);
+
+ for (int i = 0; i < expected.length; i++) {
+ String[] expectedFields = expected[i].split(",");
+ String[] resultFields = resultArray[i].split(",");
+
+ double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+ double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+ Assert.assertTrue("Values differ by more than the permissible delta",
+ Math.abs(expectedPayLoad - resultPayLoad) < delta);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitMapper implements MapFunction<Long, Double> {
+ public Double map(Long value) {
+ return 1.0;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+ public Double map(Tuple2<Double, Long> value) {
+ return value.f0 / value.f1;
+ }
+ }
+}