You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/17 15:32:19 UTC

[1/2] flink git commit: [FLINK-1576] [gelly] improvements to the gelly examples. Updated GraphMetrics, MusicProfiles and PageRank to run with and without parameters. Added input descriptions to LabelPropagation and SSSP. Fixed some minor issues in the SS

Repository: flink
Updated Branches:
  refs/heads/master 1c50d87c1 -> 9077a53bf


[FLINK-1576] [gelly] improvements to the gelly examples.
Updated GraphMetrics, MusicProfiles and PageRank to run with and without parameters.
Added input descriptions to LabelPropagation and SSSP.
Fixed some minor issues in the SSSP example.
Fixed a bug in MusicProfiles that wasn't generating the user-user graph properly.
Changed the PageRank library method to initialize the vertex ranks.

This closes #470


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e795c437
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e795c437
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e795c437

Branch: refs/heads/master
Commit: e795c437e27205273018bfc9a43da38bb1701116
Parents: 1c50d87
Author: vasia <va...@gmail.com>
Authored: Mon Mar 9 23:47:31 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 16:02:23 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/GraphMetrics.java       |  80 +++++++++++---
 .../graph/example/LabelPropagationExample.java  |  20 ++--
 .../flink/graph/example/MusicProfiles.java      | 105 ++++++++++++++++---
 .../flink/graph/example/PageRankExample.java    |  77 +++++++++++---
 .../SingleSourceShortestPathsExample.java       |  78 +++++++-------
 .../utils/SingleSourceShortestPathsData.java    |  21 +---
 .../apache/flink/graph/library/PageRank.java    |  21 +++-
 .../example/LabelPropagationExampleITCase.java  |  64 +++++------
 .../SingleSourceShortestPathsITCase.java        |   8 +-
 9 files changed, 326 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 33f8f1a..a5ddf2a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
@@ -42,25 +43,24 @@ import org.apache.flink.types.NullValue;
  * - average node degree
  * - the vertex ids with the max/min in- and out-degrees
  *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * "<sourceVertexID>\t<targetVertexID>".
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
  */
 public class GraphMetrics implements ProgramDescription {
 
-	static final int NUM_VERTICES = 100;
-	static final long SEED = 9876;
-	
-
-	@Override
-	public String getDescription() {
-		return "Graph Metrics Example";
-	}
-
 	public static void main(String[] args) throws Exception {
 
+		if (!parseParameters(args)) {
+			return;
+		}
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		/** create a random graph **/
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(ExampleUtils
-				.getRandomEdges(env, NUM_VERTICES), env);
+		/** create the graph **/
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
 		
 		/** get the number of vertices **/
 		DataSet<Integer> numVertices = graph.numberOfVertices();
@@ -98,7 +98,7 @@ public class GraphMetrics implements ProgramDescription {
 
 		env.execute();
 	}
-	
+
 	@SuppressWarnings("serial")
 	private static final class AvgNodeDegreeMapper extends RichMapFunction<Tuple2<Long, Long>, Double> {
 
@@ -120,4 +120,58 @@ public class GraphMetrics implements ProgramDescription {
 	private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> {
 		public Long map(Tuple2<Long, Long> value) { return value.f0; }
 	}
+
+	@Override
+	public String getDescription() {
+		return "Graph Metrics Example";
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String edgesInputPath = null;
+
+	static final int NUM_VERTICES = 100;
+
+	static final long SEED = 9876;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 1) {
+				System.err.println("Usage: GraphMetrics <input edges>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgesInputPath = args[0];
+		} else {
+			System.out.println("Executing Graph Metrics example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("Usage: GraphMetrics <input edges>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(Long.class, Long.class).map(
+							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
+									return new Edge<Long, NullValue>(value.f0, value.f1, 
+											NullValue.getInstance());
+								}
+					});
+		} else {
+			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index e399b3f..c43dbaa 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.library.LabelPropagation;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -38,6 +39,14 @@ import org.apache.flink.util.Collector;
  * the most frequent label among their neighbors. The algorithm converges when
  * no vertex changes value or the maximum number of iterations have been
  * reached.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs
+ * in the following format:"<sourceVertexID>\t<targetVertexID>".
+ *
+ * The vertices input file is expected to contain one vertex per line, with long IDs
+ * and long vertex values, in the following format:"<vertexID>\t<vertexValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
  */
 public class LabelPropagationExample implements ProgramDescription {
 
@@ -109,15 +118,10 @@ public class LabelPropagationExample implements ProgramDescription {
 
 		if (fileOutput) {
 			return env.readCsvFile(vertexInputPath)
-					.fieldDelimiter(" ")
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
-						@Override
-						public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
-							return new Vertex<Long, Long>(value.f0, value.f1);
-						}
-					});
+					.map(new Tuple2ToVertexMap<Long, Long>());
 		}
 
 		return env.generateSequence(1, numVertices).map(
@@ -133,7 +137,7 @@ public class LabelPropagationExample implements ProgramDescription {
 
 		if (fileOutput) {
 			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class)
 					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 948ac5b..9b18623 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -55,23 +55,33 @@ public class MusicProfiles implements ProgramDescription {
 	 * users that listen to the same song are connected. Finally, we use the
 	 * graph API to run the label propagation community detection algorithm on
 	 * the similarity graph.
+	 *
+	 * The triplets input is expected to be given as one triplet per line,
+	 * in the following format: "<userID>\t<songID>\t<playcount>".
+	 *
+	 * The mismatches input file is expected to contain one mismatch record per line,
+	 * in the following format:
+	 * "ERROR: <songID trackID> song_title"
+	 *
+	 * If no arguments are provided, the example runs with default data from {@link MusicProfilesData}.
 	 */
 	public static void main(String[] args) throws Exception {
 
+		if (!parseParameters(args)) {
+			return;
+		}
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		final int numIterations = 10;
 
 		/**
-		 * Read the user-song-play triplets The format is
-		 * <userID>\t<songID>\t<playcount>
+		 * Read the user-song-play triplets.
 		 */
-		DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env);
+		DataSet<Tuple3<String, String, Integer>> triplets = getUserSongTripletsData(env);
 
 		/**
-		 * Read the mismatches dataset and extract the songIDs The format is
-		 * "ERROR: <songID trackID> song_title"
+		 * Read the mismatches dataset and extract the songIDs
 		 */
-		DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds());
+		DataSet<Tuple1<String>> mismatches = getMismatchesData(env).map(new ExtractMismatchSongIds());
 
 		/**
 		 * Filter out the mismatches from the triplets dataset
@@ -93,7 +103,11 @@ public class MusicProfiles implements ProgramDescription {
 				.reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
 				.filter(new FilterSongNodes());
 
-		usersWithTopTrack.print();
+		if (fileOutput) {
+			usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", "\t");
+		} else {
+			usersWithTopTrack.print();
+		}
 
 		/**
 		 * Create a user-user similarity graph, based on common songs, i.e. two
@@ -126,10 +140,14 @@ public class MusicProfiles implements ProgramDescription {
 							public Long map(Tuple2<Long, Long> value) {
 								return value.f1;
 							}
-						}).run(new LabelPropagation<String>(numIterations))
+						}).run(new LabelPropagation<String>(maxIterations))
 				.getVertices();
 
-		verticesWithCommunity.print();
+		if (fileOutput) {
+			verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t");
+		} else {
+			verticesWithCommunity.print();
+		}
 
 		env.execute();
 	}
@@ -191,8 +209,10 @@ public class MusicProfiles implements ProgramDescription {
 				listeners.add(edge.getSource());
 			}
 			for (int i = 0; i < listeners.size() - 1; i++) {
-				out.collect(new Edge<String, NullValue>(listeners.get(i),
-						listeners.get(i + 1), NullValue.getInstance()));
+				for (int j = i + 1; j < listeners.size(); j++) {
+					out.collect(new Edge<String, NullValue>(listeners.get(i),
+							listeners.get(j), NullValue.getInstance()));
+				}
 			}
 		}
 	}
@@ -213,4 +233,65 @@ public class MusicProfiles implements ProgramDescription {
 	public String getDescription() {
 		return "Music Profiles Example";
 	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String userSongTripletsInputPath = null;
+
+	private static String mismatchesInputPath = null;
+
+	private static String topTracksOutputPath = null;
+
+	private static String communitiesOutputPath = null;
+
+	private static int maxIterations = 10;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 5) {
+				System.err.println("Usage: MusicProfiles <input user song triplets path>" +
+						" <input song mismatches path> <output top tracks path> "
+						+ "<output communities path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			userSongTripletsInputPath = args[0];
+			mismatchesInputPath = args[1];
+			topTracksOutputPath = args[2];
+			communitiesOutputPath = args[3];
+			maxIterations = Integer.parseInt(args[4]);
+		} else {
+			System.out.println("Executing Music Profiles example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("Usage: MusicProfiles <input user song triplets path>" +
+					" <input song mismatches path> <output top tracks path> "
+					+ "<output communities path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Tuple3<String, String, Integer>> getUserSongTripletsData(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(userSongTripletsInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(String.class, String.class, Integer.class);
+		} else {
+			return MusicProfilesData.getUserSongTriplets(env);
+		}
+	}
+
+	private static DataSet<String> getMismatchesData(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readTextFile(mismatchesInputPath);
+		} else {
+			return MusicProfilesData.getMismatches(env);
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
index d279aa5..cc0b702 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
@@ -28,20 +28,38 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.library.PageRank;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.util.Collector;
 
+/**
+ * This example implements a simple PageRank algorithm, using a vertex-centric iteration.
+ *
+ * The edges input file is expected to contain one edge per line, with long IDs and double
+ * values, in the following format:"<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ *
+ * If no arguments are provided, the example runs with a random graph of 10 vertices
+ * and random edge weights.
+ *
+ */
 public class PageRankExample implements ProgramDescription {
 
 	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
 
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if(!parseParameters(args)) {
+			return;
+		}
 
-		DataSet<Vertex<Long, Double>> pages = getPagesDataSet(env);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
 
-		Graph<Long, Double, Double> network = Graph.fromDataSet(pages, links, env);
+		Graph<Long, Double, Double> network = Graph.fromDataSet(links, new MapFunction<Long, Double>() {
+
+			public Double map(Long value) throws Exception {
+				return 1.0;
+			}
+		}, env);
 
 		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
 
@@ -58,34 +76,63 @@ public class PageRankExample implements ProgramDescription {
 				new PageRank<Long>(DAMPENING_FACTOR, maxIterations))
 				.getVertices();
 
-		pageRanks.print();
+		if (fileOutput) {
+			pageRanks.writeAsCsv(outputPath, "\n", "\t");
+		} else {
+			pageRanks.print();
+		}
 
 		env.execute();
 	}
 
 	@Override
 	public String getDescription() {
-		return "PageRank";
+		return "PageRank example";
 	}
 
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
 	private static final double DAMPENING_FACTOR = 0.85;
 	private static long numPages = 10;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
 	private static int maxIterations = 10;
 
-	@SuppressWarnings("serial")
-	private static DataSet<Vertex<Long, Double>> getPagesDataSet(ExecutionEnvironment env) {
-		return env.generateSequence(1, numPages).map(
-				new MapFunction<Long, Vertex<Long, Double>>() {
-					@Override
-					public Vertex<Long, Double> map(Long l) throws Exception {
-						return new Vertex<Long, Double>(l, 1.0 / numPages);
-					}
-				});
-
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: PageRank <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+			System.out.println("Executing PageRank example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: PageRank <input edges path> <output path> <num iterations>");
+		}
+		return true;
 	}
 
 	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
+
+		if (fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		}
+
 		return env.generateSequence(1, numPages).flatMap(
 				new FlatMapFunction<Long, Edge<Long, Double>>() {
 					@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 6c85397..ff523ce 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -30,8 +29,20 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.library.SingleSourceShortestPaths;
 
+/**
+ * This example implements the Single Source Shortest Paths algorithm,
+ * using a vertex-centric iteration.
+ *
+ * The input file is expected to contain one edge per line, with long IDs
+ * and double weights, in CSV format:
+ * "<sourceVertexID>\t<targetVertexID>\t<edgeValue>".
+ *
+ * If no arguments are provided, the example runs with default data from {@link SingleSourceShortestPathsData}.
+ *
+ */
 public class SingleSourceShortestPathsExample implements ProgramDescription {
 
+	@SuppressWarnings("serial")
 	public static void main(String[] args) throws Exception {
 
 		if (!parseParameters(args)) {
@@ -40,15 +51,19 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env);
-
 		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
 
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, Double>() {
+
+					public Double map(Long value) {
+						return Double.MAX_VALUE;
+					}
+		}, env);
 
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
-				.run(new SingleSourceShortestPaths<Long>(srcVertexId,
-						maxIterations)).getVertices();
+				.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations))
+				.getVertices();
 
 		// emit result
 		if (fileOutput) {
@@ -71,9 +86,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 
 	private static boolean fileOutput = false;
 
-	private static Long srcVertexId = null;
-
-	private static String verticesInputPath = null;
+	private static Long srcVertexId = 1l;
 
 	private static String edgesInputPath = null;
 
@@ -83,41 +96,27 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 
 	private static boolean parseParameters(String[] args) {
 
-		if (args.length > 0) {
-			if (args.length == 5) {
-				fileOutput = true;
-				srcVertexId = Long.parseLong(args[0]);
-				verticesInputPath = args[1];
-				edgesInputPath = args[2];
-				outputPath = args[3];
-				maxIterations = Integer.parseInt(args[4]);
-			} else {
+		if(args.length > 0) {
+			if(args.length != 4) {
 				System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-						" <input vertices path> <input edges path> <output path> <num iterations>");
+						" <input edges path> <output path> <num iterations>");
 				return false;
 			}
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(verticesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class)
-					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
 
-						@Override
-						public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception {
-							return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
-						}
-					});
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
 		} else {
-			System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-					" <input vertices path> <input edges path> <output path> <num iterations>");
-			return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
+				System.out.println("Executing Single Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
 		}
+		return true;
 	}
 
 	@SuppressWarnings("serial")
@@ -125,6 +124,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 		if (fileOutput) {
 			return env.readCsvFile(edgesInputPath)
 					.lineDelimiter("\n")
+					.fieldDelimiter("\t")
 					.types(Long.class, Long.class, Double.class)
 					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
 
@@ -134,8 +134,6 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 						}
 					});
 		} else {
-			System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-					" <input vertices path> <input edges path> <output path> <num iterations>");
 			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
index 7e5445f..67cd150 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
@@ -21,33 +21,16 @@ package org.apache.flink.graph.example.utils;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class SingleSourceShortestPathsData {
 
-	public static final int NUM_VERTICES = 5;
-
 	public static final Long SRC_VERTEX_ID = 1L;
 
-	public static final String VERTICES = "1,1.0\n" + "2,2.0\n" + "3,3.0\n" + "4,4.0\n" + "5,5.0";
-
-	public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
-
-		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
-		vertices.add(new Vertex<Long, Double>(1L, 1.0));
-		vertices.add(new Vertex<Long, Double>(2L, 2.0));
-		vertices.add(new Vertex<Long, Double>(3L, 3.0));
-		vertices.add(new Vertex<Long, Double>(4L, 4.0));
-		vertices.add(new Vertex<Long, Double>(5L, 5.0));
-
-		return env.fromCollection(vertices);
-	}
-
-	public static final String EDGES = "1,2,12.0\n" + "1,3,13.0\n" + "2,3,23.0\n" + "3,4,34.0\n" + "3,5,35.0\n" + 
-					"4,5,45.0\n" + "5,1,51.0";
+	public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
+					"4\t5\t45.0\n" + "5\t1\t51.0";
 
 	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index e43ee51..e06e64f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -41,9 +42,15 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 
 	@Override
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+
+		DataSet<Integer> numberOfVertices = network.numberOfVertices();
+
 		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
 				new VertexRankUpdater<K>(beta), new RankMessenger<K>(), maxIterations);
-		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", network.numberOfVertices());
+
+		iteration.addBroadcastSetForMessagingFunction("numberOfVertices", numberOfVertices);
+		iteration.addBroadcastSetForUpdateFunction("numberOfVertices", numberOfVertices);
+
 		return network.runVertexCentricIteration(iteration);
 	}
 
@@ -55,7 +62,6 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	public static final class VertexRankUpdater<K extends Comparable<K> & Serializable>
 			extends VertexUpdateFunction<K, Double, Double> {
 
-		
 		private final double beta;
 		private int numVertices;
 		
@@ -91,8 +97,19 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	public static final class RankMessenger<K extends Comparable<K> & Serializable>
 			extends MessagingFunction<K, Double, Double, Double> {
 
+		private int numVertices;
+
+		@Override
+		public void preSuperstep(){
+			numVertices = (Integer) getBroadcastSet("numberOfVertices").iterator().next();
+		}
+
 		@Override
 		public void sendMessages(K vertexId, Double newRank) {
+			if (getSuperstepNumber() == 1) {
+				// initialize vertex ranks
+				newRank = 1.0 / numVertices;
+			}
 			for (Edge<K, Double> edge : getOutgoingEdges()) {
 				sendMessageTo(edge.getTarget(), newRank * edge.getValue());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
index 0e1810f..3298b7f 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
@@ -61,20 +61,20 @@ public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
 		 * Test one iteration of label propagation example with a simple graph
 		 */
 
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 30\n" +
-				"4 40\n" +
-				"5 40\n" +
-				"6 40\n" +
-				"7 70\n";
-
-		final String edges = "1 3\n" +
-				"2 3\n" +
-				"4 7\n" +
-				"5 7\n" +
-				"6 7\n" +
-				"7 3\n";
+		final String vertices = "1	10\n" +
+				"2	10\n" +
+				"3	30\n" +
+				"4	40\n" +
+				"5	40\n" +
+				"6	40\n" +
+				"7	70\n";
+
+		final String edges = "1	3\n" +
+				"2	3\n" +
+				"4	7\n" +
+				"5	7\n" +
+				"6	7\n" +
+				"7	3\n";
 
 		String verticesPath = createTempFile(vertices);
 		String edgesPath = createTempFile(edges);
@@ -96,24 +96,24 @@ public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
 		 * Test the label propagation example where a tie must be broken
 		 */
 
-		final String vertices = "1 10\n" +
-				"2 10\n" +
-				"3 10\n" +
-				"4 10\n" +
-				"5 0\n" +
-				"6 20\n" +
-				"7 20\n" +
-				"8 20\n" +
-				"9 20\n";
-
-		final String edges = "1 5\n" +
-				"2 5\n" +
-				"3 5\n" +
-				"4 5\n" +
-				"6 5\n" +
-				"7 5\n" +
-				"8 5\n" +
-				"9 5\n";
+		final String vertices = "1	10\n" +
+				"2	10\n" +
+				"3	10\n" +
+				"4	10\n" +
+				"5	0\n" +
+				"6	20\n" +
+				"7	20\n" +
+				"8	20\n" +
+				"9	20\n";
+
+		final String edges = "1	5\n" +
+				"2	5\n" +
+				"3	5\n" +
+				"4	5\n" +
+				"6	5\n" +
+				"7	5\n" +
+				"8	5\n" +
+				"9	5\n";
 
 		String verticesPath = createTempFile(vertices);
 		String edgesPath = createTempFile(edges);

http://git-wip-us.apache.org/repos/asf/flink/blob/e795c437/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
index 9f563da..aa2d6f0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -36,8 +36,6 @@ import java.io.File;
 @RunWith(Parameterized.class)
 public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
 
-    private String verticesPath;
-
     private String edgesPath;
 
     private String resultPath;
@@ -54,20 +52,16 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
     @Before
     public void before() throws Exception {
         resultPath = tempFolder.newFile().toURI().toString();
-        File verticesFile = tempFolder.newFile();
-        Files.write(SingleSourceShortestPathsData.VERTICES, verticesFile, Charsets.UTF_8);
 
         File edgesFile = tempFolder.newFile();
         Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8);
-
-        verticesPath = verticesFile.toURI().toString();
         edgesPath = edgesFile.toURI().toString();
     }
 
     @Test
     public void testSSSPExample() throws Exception {
         SingleSourceShortestPathsExample.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
-                verticesPath, edgesPath, resultPath, SingleSourceShortestPathsData.NUM_VERTICES + ""});
+                edgesPath, resultPath, 10 + ""});
         expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
     }
 


[2/2] flink git commit: [FLINK-1652] fixes superstep increment in CollectionExecutor

Posted by va...@apache.org.
[FLINK-1652] fixes superstep increment in CollectionExecutor

This closes #464


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9077a53b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9077a53b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9077a53b

Branch: refs/heads/master
Commit: 9077a53bf536e96f04a818f029ddc6cf4a674fe4
Parents: e795c43
Author: vasia <va...@gmail.com>
Authored: Mon Mar 9 00:11:20 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Tue Mar 17 16:21:14 2015 +0200

----------------------------------------------------------------------
 .../common/operators/CollectionExecutor.java    | 21 +++--
 .../test/CollectionModeSuperstepITCase.java     | 87 ++++++++++++++++++++
 .../operations/DegreesWithExceptionITCase.java  |  1 -
 3 files changed, 100 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 2f9ae9a..78ad930 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -71,6 +71,8 @@ public class CollectionExecutor {
 	
 	private final ExecutionConfig executionConfig;
 
+	private int iterationSuperstep;
+
 	// --------------------------------------------------------------------------------------------
 	
 	public CollectionExecutor(ExecutionConfig executionConfig) {
@@ -183,7 +185,7 @@ public class CollectionExecutor {
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass().getClassLoader(), executionConfig) :
-					new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
+					new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -225,7 +227,7 @@ public class CollectionExecutor {
 		RuntimeUDFContext ctx;
 		if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
 			ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig) :
-				new IterationRuntimeUDFContext(operator.getName(), 1, 0, superStep, classLoader, executionConfig);
+				new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, executionConfig);
 			
 			for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
 				List<?> bcData = execute(bcInputs.getValue());
@@ -279,7 +281,10 @@ public class CollectionExecutor {
 			
 			// set the input to the current partial solution
 			this.intermediateResults.put(iteration.getPartialSolution(), currentResult);
-			
+
+			// set the superstep number
+			iterationSuperstep = superstep;
+
 			// grab the current iteration result
 			currentResult = (List<T>) execute(iteration.getNextPartialSolution(), superstep);
 
@@ -373,6 +378,9 @@ public class CollectionExecutor {
 			this.intermediateResults.put(iteration.getSolutionSet(), currentSolution);
 			this.intermediateResults.put(iteration.getWorkset(), currentWorkset);
 
+			// set the superstep number
+			iterationSuperstep = superstep;
+
 			// grab the current iteration result
 			List<T> solutionSetDelta = (List<T>) execute(iteration.getSolutionSetDelta(), superstep);
 			this.intermediateResults.put(iteration.getSolutionSetDelta(), solutionSetDelta);
@@ -477,16 +485,13 @@ public class CollectionExecutor {
 	
 	private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {
 
-		private final int superstep;
-
-		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, int superstep, ClassLoader classloader, ExecutionConfig executionConfig) {
+		public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, ExecutionConfig executionConfig) {
 			super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig);
-			this.superstep = superstep;
 		}
 
 		@Override
 		public int getSuperstepNumber() {
-			return superstep;
+			return iterationSuperstep;
 		}
 
 		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
new file mode 100644
index 0000000..ffe91d9
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CollectionModeSuperstepITCase {
+
+	/**
+	 * Dummy iteration to test that the supersteps are correctly incremented
+	 * and can be retrieved from inside the updated and messaging functions.
+	 * All vertices start with value 1 and increase their value by 1
+	 * in each iteration. 
+	 */
+	@Test
+	public void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+		
+		VertexCentricIteration<Long, Long, Long, Long> iteration = 
+				graph.createVertexCentricIteration(new UpdateFunction(), new MessageFunction(), 10);
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(iteration);
+
+		result.getVertices().map(
+				new VertexToTuple2Map<Long, Long>()).output(
+						new DiscardingOutputFormat<Tuple2<Long, Long>>());
+		env.execute();
+	}
+	
+	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+			long superstep = getSuperstepNumber();
+			Assert.assertEquals(true, vertexValue == superstep);
+			setNewVertexValue(vertexValue + 1);
+		}
+	}
+	
+	public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+		@Override
+		public void sendMessages(Long vertexId, Long vertexValue) {
+			long superstep = getSuperstepNumber();
+			Assert.assertEquals(true, vertexValue == superstep);
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertexValue);
+		}
+	}
+
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9077a53b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index e83802c..18826b6 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -36,7 +36,6 @@ import java.util.NoSuchElementException;
 
 import static org.junit.Assert.*;
 
-@SuppressWarnings("serial")
 public class DegreesWithExceptionITCase {
 
 	private static final int PARALLELISM = 4;