You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/08/07 11:59:44 UTC

flink git commit: [FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example

Repository: flink
Updated Branches:
  refs/heads/master 441ebf1ff -> f1dd914de


[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example

This closes #968


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1dd914d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1dd914d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1dd914d

Branch: refs/heads/master
Commit: f1dd914de21313a90c3799438f1318349dd5d6df
Parents: 441ebf1
Author: vasia <va...@apache.org>
Authored: Fri Jul 31 22:12:18 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Aug 7 11:33:21 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/MusicProfiles.java      | 41 +++++++++++++-------
 .../graph/test/example/MusicProfilesITCase.java |  2 +-
 2 files changed, 27 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index a535216..0fc45bd 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -46,15 +46,17 @@ import org.apache.flink.util.Collector;
 public class MusicProfiles implements ProgramDescription {
 
 	/**
-	 * This example demonstrates how to mix the "record" Flink API with the
-	 * graph API. The input is a set <userId - songId - playCount> triplets and
-	 * a set of bad records,i.e. song ids that should not be trusted. Initially,
-	 * we use the record API to filter out the bad records. Then, we use the
-	 * graph API to create a user -> song weighted bipartite graph and compute
-	 * the top song (most listened) per user. Then, we use the record API again,
-	 * to create a user-user similarity graph, based on common songs, where two
-	 * users that listen to the same song are connected. Finally, we use the
-	 * graph API to run the label propagation community detection algorithm on
+	 * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
+	 * The input is a set <userId - songId - playCount> triplets and
+	 * a set of bad records, i.e. song ids that should not be trusted.
+	 * Initially, we use the DataSet API to filter out the bad records.
+	 * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
+	 * the top song (most listened) per user.
+	 * Then, we use the DataSet API again, to create a user-user similarity graph,
+	 * based on common songs, where users that are listeners of the same song
+	 * are connected. A user-defined threshold on the playcount value
+	 * defines when a user is considered to be a listener of a song.
+	 * Finally, we use the graph API to run the label propagation community detection algorithm on
 	 * the similarity graph.
 	 *
 	 * The triplets input is expected to be given as one triplet per line,
@@ -116,7 +118,13 @@ public class MusicProfiles implements ProgramDescription {
 		 * create an edge between each pair of its in-neighbors.
 		 */
 		DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
-				.getEdges().groupBy(1)
+				.getEdges()
+				// filter out user-song edges that are below the playcount threshold
+				.filter(new FilterFunction<Edge<String, Integer>>() {
+					public boolean filter(Edge<String, Integer> edge) {
+						return (edge.getValue() > playcountThreshold);
+					}
+				}).groupBy(1)
 				.reduceGroup(new CreateSimilarUserEdges()).distinct();
 
 		Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
@@ -241,6 +249,8 @@ public class MusicProfiles implements ProgramDescription {
 
 	private static String topTracksOutputPath = null;
 
+	private static int playcountThreshold = 0;
+
 	private static String communitiesOutputPath = null;
 
 	private static int maxIterations = 10;
@@ -248,10 +258,10 @@ public class MusicProfiles implements ProgramDescription {
 	private static boolean parseParameters(String[] args) {
 
 		if(args.length > 0) {
-			if(args.length != 5) {
+			if(args.length != 6) {
 				System.err.println("Usage: MusicProfiles <input user song triplets path>" +
 						" <input song mismatches path> <output top tracks path> "
-						+ "<output communities path> <num iterations>");
+						+ "<playcount threshold> <output communities path> <num iterations>");
 				return false;
 			}
 
@@ -259,15 +269,16 @@ public class MusicProfiles implements ProgramDescription {
 			userSongTripletsInputPath = args[0];
 			mismatchesInputPath = args[1];
 			topTracksOutputPath = args[2];
-			communitiesOutputPath = args[3];
-			maxIterations = Integer.parseInt(args[4]);
+			playcountThreshold = Integer.parseInt(args[3]);
+			communitiesOutputPath = args[4];
+			maxIterations = Integer.parseInt(args[5]);
 		} else {
 			System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
 			System.out.println("  Provide parameters to read input data from files.");
 			System.out.println("  See the documentation for the correct format of input files.");
 			System.out.println("Usage: MusicProfiles <input user song triplets path>" +
 					" <input song mismatches path> <output top tracks path> "
-					+ "<output communities path> <num iterations>");
+					+ "<playcount threshold> <output communities path> <num iterations>");
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
index 0410d41..5aa9f26 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
@@ -73,7 +73,7 @@ public class MusicProfilesITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testMusicProfilesExample() throws Exception {
-		MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath,
+		MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
 				MusicProfilesData.MAX_ITERATIONS + ""});
 		expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
 	}