You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/08/07 11:59:44 UTC
flink git commit: [FLINK-2452] [Gelly] adds a playcount threshold to
the MusicProfiles example
Repository: flink
Updated Branches:
refs/heads/master 441ebf1ff -> f1dd914de
[FLINK-2452] [Gelly] adds a playcount threshold to the MusicProfiles example
This closes #968
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1dd914d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1dd914d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1dd914d
Branch: refs/heads/master
Commit: f1dd914de21313a90c3799438f1318349dd5d6df
Parents: 441ebf1
Author: vasia <va...@apache.org>
Authored: Fri Jul 31 22:12:18 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Aug 7 11:33:21 2015 +0200
----------------------------------------------------------------------
.../flink/graph/example/MusicProfiles.java | 41 +++++++++++++-------
.../graph/test/example/MusicProfilesITCase.java | 2 +-
2 files changed, 27 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index a535216..0fc45bd 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -46,15 +46,17 @@ import org.apache.flink.util.Collector;
public class MusicProfiles implements ProgramDescription {
/**
- * This example demonstrates how to mix the "record" Flink API with the
- * graph API. The input is a set <userId - songId - playCount> triplets and
- * a set of bad records,i.e. song ids that should not be trusted. Initially,
- * we use the record API to filter out the bad records. Then, we use the
- * graph API to create a user -> song weighted bipartite graph and compute
- * the top song (most listened) per user. Then, we use the record API again,
- * to create a user-user similarity graph, based on common songs, where two
- * users that listen to the same song are connected. Finally, we use the
- * graph API to run the label propagation community detection algorithm on
+ * This example demonstrates how to mix the DataSet Flink API with the Gelly API.
+ * The input is a set <userId - songId - playCount> triplets and
+ * a set of bad records, i.e. song ids that should not be trusted.
+ * Initially, we use the DataSet API to filter out the bad records.
+ * Then, we use Gelly to create a user -> song weighted bipartite graph and compute
+ * the top song (most listened) per user.
+ * Then, we use the DataSet API again, to create a user-user similarity graph,
+ * based on common songs, where users that are listeners of the same song
+ * are connected. A user-defined threshold on the playcount value
+ * defines when a user is considered to be a listener of a song.
+ * Finally, we use the graph API to run the label propagation community detection algorithm on
* the similarity graph.
*
* The triplets input is expected to be given as one triplet per line,
@@ -116,7 +118,13 @@ public class MusicProfiles implements ProgramDescription {
* create an edge between each pair of its in-neighbors.
*/
DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
- .getEdges().groupBy(1)
+ .getEdges()
+ // filter out user-song edges that are below the playcount threshold
+ .filter(new FilterFunction<Edge<String, Integer>>() {
+ public boolean filter(Edge<String, Integer> edge) {
+ return (edge.getValue() > playcountThreshold);
+ }
+ }).groupBy(1)
.reduceGroup(new CreateSimilarUserEdges()).distinct();
Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
@@ -241,6 +249,8 @@ public class MusicProfiles implements ProgramDescription {
private static String topTracksOutputPath = null;
+ private static int playcountThreshold = 0;
+
private static String communitiesOutputPath = null;
private static int maxIterations = 10;
@@ -248,10 +258,10 @@ public class MusicProfiles implements ProgramDescription {
private static boolean parseParameters(String[] args) {
if(args.length > 0) {
- if(args.length != 5) {
+ if(args.length != 6) {
System.err.println("Usage: MusicProfiles <input user song triplets path>" +
" <input song mismatches path> <output top tracks path> "
- + "<output communities path> <num iterations>");
+ + "<playcount threshold> <output communities path> <num iterations>");
return false;
}
@@ -259,15 +269,16 @@ public class MusicProfiles implements ProgramDescription {
userSongTripletsInputPath = args[0];
mismatchesInputPath = args[1];
topTracksOutputPath = args[2];
- communitiesOutputPath = args[3];
- maxIterations = Integer.parseInt(args[4]);
+ playcountThreshold = Integer.parseInt(args[3]);
+ communitiesOutputPath = args[4];
+ maxIterations = Integer.parseInt(args[5]);
} else {
System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files.");
System.out.println("Usage: MusicProfiles <input user song triplets path>" +
" <input song mismatches path> <output top tracks path> "
- + "<output communities path> <num iterations>");
+ + "<playcount threshold> <output communities path> <num iterations>");
}
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1dd914d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
index 0410d41..5aa9f26 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
@@ -73,7 +73,7 @@ public class MusicProfilesITCase extends MultipleProgramsTestBase {
@Test
public void testMusicProfilesExample() throws Exception {
- MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, communitiesResultPath,
+ MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
MusicProfilesData.MAX_ITERATIONS + ""});
expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
}