You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/06/25 23:25:42 UTC

[4/5] flink git commit: [FLINK-2271] [FLINK-1522] [gelly] changed PageRank example to expect an unweighted edge list added PageRank and MusicProfiles tests got rid of unused suppress warning annotations in Graph and JaccardSimilarityMeasure changed GSAPa

[FLINK-2271] [FLINK-1522] [gelly] changed PageRank example to expect an unweighted edge list
added PageRank and MusicProfiles tests
got rid of unused suppress warning annotations in Graph and JaccardSimilarityMeasure
changed GSAPageRank example to expect an unweighted edge list

This closes #865


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe5bbdbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe5bbdbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe5bbdbf

Branch: refs/heads/master
Commit: fe5bbdbfbdefee3c1b150c661ff3292769c030bb
Parents: cad8510
Author: vasia <va...@apache.org>
Authored: Wed Jun 24 18:09:13 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Jun 25 22:58:17 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java |   3 -
 .../apache/flink/graph/example/GSAPageRank.java |  13 ++-
 .../graph/example/JaccardSimilarityMeasure.java |   2 -
 .../apache/flink/graph/example/PageRank.java    |  16 +--
 .../graph/example/utils/MusicProfilesData.java  |  23 ++++-
 .../flink/graph/example/utils/PageRankData.java |  42 ++++++++
 .../flink/graph/test/GatherSumApplyITCase.java  |  17 ----
 .../graph/test/example/MusicProfilesITCase.java | 101 +++++++++++++++++++
 .../graph/test/example/PageRankITCase.java      |  78 ++++++++++++++
 9 files changed, 260 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 5e13ce1..a7c75bc 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1025,7 +1025,6 @@ public class Graph<K, VV, EV> {
 	 * @param vertex the vertex to be added
 	 * @return the new graph containing the existing vertices as well as the one just added
 	 */
-	@SuppressWarnings("unchecked")
 	public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
 		List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
 		newVertex.add(vertex);
@@ -1040,7 +1039,6 @@ public class Graph<K, VV, EV> {
 	 * @param verticesToAdd the list of vertices to add
 	 * @return the new graph containing the existing and newly added vertices
 	 */
-	@SuppressWarnings("unchecked")
 	public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
 		// Add the vertices
 		DataSet<Vertex<K, VV>> newVertices = this.vertices.union(this.context.fromCollection(verticesToAdd)).distinct();
@@ -1074,7 +1072,6 @@ public class Graph<K, VV, EV> {
 	 * @param newEdges the data set of edges to be added
 	 * @return a new graph containing the existing edges plus the newly added edges.
 	 */
-	@SuppressWarnings("unchecked")
 	public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
 
 		DataSet<Edge<K,EV>> newEdgesDataSet = this.context.fromCollection(newEdges);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
index 47f67b8..45d4555 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAPageRank.java
@@ -31,14 +31,13 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.util.Collector;
 
 /**
  * This example implements a simple PageRank algorithm, using a gather-sum-apply iteration.
  *
- * The edges input file is expected to contain one edge per line, with long IDs and double
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ * The edges input file is expected to contain one edge per line, with long IDs and no
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
  *
  * If no arguments are provided, the example runs with a random graph of 10 vertices
  * and random edge weights.
@@ -187,8 +186,12 @@ public class GSAPageRank implements ProgramDescription {
 			return env.readCsvFile(edgeInputPath)
 					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						public Edge<Long, Double> map(Tuple2<Long, Long> input) {
+							return new Edge<Long, Double>(input.f0, input.f1, 1.0);
+						}
+					}).withForwardedFields("f0; f1");
 		}
 
 		return env.generateSequence(1, numPages).flatMap(

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
index be241ce..79de407 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -114,7 +114,6 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 	/**
 	 * Each vertex will have a HashSet containing its neighbor ids as value.
 	 */
-	@SuppressWarnings("serial")
 	private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> {
 
 		@Override
@@ -136,7 +135,6 @@ public class JaccardSimilarityMeasure implements ProgramDescription {
 	 *
 	 * The Jaccard similarity coefficient is then, the intersection/union.
 	 */
-	@SuppressWarnings("serial")
 	private static final class ComputeJaccard implements
 			MapFunction<Triplet<Long, HashSet<Long>, Double>, Edge<Long, Double>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
index e4ad13f..10b4be4 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRank.java
@@ -28,17 +28,15 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.library.PageRankAlgorithm;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.util.Collector;
 
 /**
  * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
  *
- * The edges input file is expected to contain one edge per line, with long IDs and double
- * values, in the following format:"<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ * The edges input file is expected to contain one edge per line, with long IDs and no
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>".
  *
- * If no arguments are provided, the example runs with a random graph of 10 vertices
- * and random edge weights.
+ * If no arguments are provided, the example runs with a random graph of 10 vertices.
  *
  */
 public class PageRank implements ProgramDescription {
@@ -131,8 +129,12 @@ public class PageRank implements ProgramDescription {
 			return env.readCsvFile(edgeInputPath)
 					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						public Edge<Long, Double> map(Tuple2<Long, Long> input) {
+							return new Edge<Long, Double>(input.f0, input.f1, 1.0);
+						}
+					}).withForwardedFields("f0; f1");
 		}
 
 		return env.generateSequence(1, numPages).flatMap(

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
index 0a7162d..6b96372 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -79,5 +79,26 @@ public class MusicProfilesData {
 		errors.add("ERROR: <song_15 track_15> Black Trees");
 		return env.fromCollection(errors);
 	}
-}
 
+	public static final String USER_SONG_TRIPLETS = "user_1	song_1	100\n" + "user_1	song_5	200\n"
+			+ "user_2	song_1	10\n" + "user_2	song_4	20\n"
+			+ "user_3	song_2	3\n"
+			+ "user_4	song_2	1\n" + "user_4	song_3	2\n"
+			+ "user_5	song_3	30";
+
+	public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie";
+
+	public static final String MAX_ITERATIONS = "2";
+
+	public static final String TOP_SONGS_RESULT = "user_1	song_1\n" +
+								"user_2	song_4\n" +
+								"user_3	song_2\n" +
+								"user_4	song_3\n" +
+								"user_5	song_3";
+
+	public static final String COMMUNITIES_RESULT = "user_1	1\n" +
+								"user_2	1\n" +
+								"user_3	3\n" +
+								"user_4	3\n" +
+								"user_5	4";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
new file mode 100644
index 0000000..077572e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+public class PageRankData {
+	
+	public static final String EDGES = "2	1\n" +
+										"5	2\n" + 
+										"5	4\n" +
+										"4	3\n" +
+										"4	2\n" +
+										"1	4\n" +
+										"1	2\n" +
+										"1	3\n" +
+										"3	5\n";
+
+	
+	public static final String RANKS_AFTER_3_ITERATIONS = "1	0.237\n" +
+														"2	0.248\n" + 
+														"3	0.173\n" +
+														"4	0.175\n" +
+														"5	0.165\n";
+
+	private PageRankData() {}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index ca40323..a883fa0 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAPageRank;
 import org.apache.flink.graph.example.GSAConnectedComponents;
 import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -98,22 +97,6 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Page Rank Test
-	// --------------------------------------------------------------------------------------------
-
-	@Test
-	public void testPageRank() throws Exception {
-		GSAPageRank.main(new String[]{edgesPath, resultPath, "16"});
-		expectedResult = "1	7.47880014315678E21\n" +
-				"2	1.6383884499619055E21\n" +
-				"3	3.044048626469292E21\n" +
-				"4	1.6896936994425786E21\n" +
-				"5	4.214827876275491E21\n" +
-				"6	1.0\n" +
-				"7	8.157142857142858";
-	}
-
-	// --------------------------------------------------------------------------------------------
 	//  Sample data
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
new file mode 100644
index 0000000..0410d41
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.MusicProfiles;
+import org.apache.flink.graph.example.utils.MusicProfilesData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class MusicProfilesITCase extends MultipleProgramsTestBase {
+
+	private String tripletsPath;
+
+	private String mismatchesPath;
+
+	private String topSongsResultPath;
+
+	private String communitiesResultPath;
+
+	private String expectedTopSongs;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public MusicProfilesITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		topSongsResultPath = tempFolder.newFile().toURI().toString();
+		communitiesResultPath = tempFolder.newFile().toURI().toString();
+
+		File tripletsFile = tempFolder.newFile();
+		Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8);
+		tripletsPath = tripletsFile.toURI().toString();
+
+		File mismatchesFile = tempFolder.newFile();
+		Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8);
+		mismatchesPath = mismatchesFile.toURI().toString();
+	}
+
+	@Test
+	public void testMusicProfilesExample() throws Exception {
+		MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath,
+				MusicProfilesData.MAX_ITERATIONS + ""});
+		expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath);
+
+		ArrayList<String> list = new ArrayList<String>();
+		readAllResultLines(list, communitiesResultPath, new String[]{}, false);
+
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
+
+		// check that user_1 and user_2 are in the same community
+		Assert.assertEquals("users 1 and 2 are not in the same community",
+				result[0].substring(7), result[1].substring(7));
+
+		// check that user_3, user_4 and user_5 are in the same community
+		Assert.assertEquals("users 3 and 4 are not in the same community",
+				result[2].substring(7), result[3].substring(7));
+		Assert.assertEquals("users 4 and 5 are not in the same community",
+				result[3].substring(7), result[4].substring(7));
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5bbdbf/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
new file mode 100644
index 0000000..544cc66
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/PageRankITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import java.io.File;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.example.GSAPageRank;
+import org.apache.flink.graph.example.PageRank;
+import org.apache.flink.graph.example.utils.PageRankData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String edgesPath;
+	private String resultPath;
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(PageRankData.EDGES, edgesFile, Charsets.UTF_8);
+
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareKeyValueParisWithDelta(expected, resultPath, "\t", 0.01);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterations() throws Exception {
+		PageRank.main(new String[] {edgesPath, resultPath, "3"});
+		expected =  PageRankData.RANKS_AFTER_3_ITERATIONS;
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterations() throws Exception {
+		GSAPageRank.main(new String[] {edgesPath, resultPath, "3"});
+		expected =  PageRankData.RANKS_AFTER_3_ITERATIONS;
+	}
+}