You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/19 11:10:25 UTC

[3/5] flink git commit: [FLINK-3013] [gelly] Incorrect package declaration in GellyScalaAPICompletenessTest.scala

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index 294926f..85373d3 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -32,176 +32,156 @@ import org.apache.flink.graph.Vertex;
 
 public class TestGraphUtils {
 
-	public static final DataSet<Vertex<Long, Long>> getLongLongVertexData(
+	public static DataSet<Vertex<Long, Long>> getLongLongVertexData(
 			ExecutionEnvironment env) {
 
 		return env.fromCollection(getLongLongVertices());
 	}
 	
-	public static final DataSet<Edge<Long, Long>> getLongLongEdgeData(
+	public static DataSet<Edge<Long, Long>> getLongLongEdgeData(
 			ExecutionEnvironment env) {
 
 		return env.fromCollection(getLongLongEdges());
 	}
 
-	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
+	public static DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
 			ExecutionEnvironment env) {
 		List<Edge<Long, Long>> edges = getLongLongEdges();
 
 		edges.remove(1);
-		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
+		edges.add(new Edge<>(13L, 3L, 13L));
 
 		return env.fromCollection(edges);
 	}
 
-	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
+	public static DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
 			ExecutionEnvironment env) {
 		List<Edge<Long, Long>> edges =  getLongLongEdges();
 
 		edges.remove(0);
-		edges.add(new Edge<Long, Long>(3L, 13L, 13L));
+		edges.add(new Edge<>(3L, 13L, 13L));
 
 		return env.fromCollection(edges);
 	}
 
-	public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
+	public static DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
 			ExecutionEnvironment env) {
 		List<Edge<Long, Long>> edges = getLongLongEdges();
 		edges.remove(0);
 		edges.remove(1);
 		edges.remove(2);
-		edges.add(new Edge<Long, Long>(13L, 3L, 13L));
-		edges.add(new Edge<Long, Long>(1L, 12L, 12L));
-		edges.add(new Edge<Long, Long>(13L, 33L, 13L));
+		edges.add(new Edge<>(13L, 3L, 13L));
+		edges.add(new Edge<>(1L, 12L, 12L));
+		edges.add(new Edge<>(13L, 33L, 13L));
 		return env.fromCollection(edges);
 	}
 
-	public static final DataSet<Edge<String, Long>> getStringLongEdgeData(
+	public static DataSet<Edge<String, Long>> getStringLongEdgeData(
 			ExecutionEnvironment env) {
-		List<Edge<String, Long>> edges = new ArrayList<Edge<String, Long>>();
-		edges.add(new Edge<String, Long>("1", "2", 12L));
-		edges.add(new Edge<String, Long>("1", "3", 13L));
-		edges.add(new Edge<String, Long>("2", "3", 23L));
-		edges.add(new Edge<String, Long>("3", "4", 34L));
-		edges.add(new Edge<String, Long>("3", "5", 35L));
-		edges.add(new Edge<String, Long>("4", "5", 45L));
-		edges.add(new Edge<String, Long>("5", "1", 51L));
+		List<Edge<String, Long>> edges = new ArrayList<>();
+		edges.add(new Edge<>("1", "2", 12L));
+		edges.add(new Edge<>("1", "3", 13L));
+		edges.add(new Edge<>("2", "3", 23L));
+		edges.add(new Edge<>("3", "4", 34L));
+		edges.add(new Edge<>("3", "5", 35L));
+		edges.add(new Edge<>("4", "5", 45L));
+		edges.add(new Edge<>("5", "1", 51L));
 		return env.fromCollection(edges);
 	}
 
-	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2Data(
+	public static DataSet<Tuple2<Long, Long>> getLongLongTuple2Data(
 			ExecutionEnvironment env) {
-		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
-		tuples.add(new Tuple2<Long, Long>(1L, 10L));
-		tuples.add(new Tuple2<Long, Long>(2L, 20L));
-		tuples.add(new Tuple2<Long, Long>(3L, 30L));
-		tuples.add(new Tuple2<Long, Long>(4L, 40L));
-		tuples.add(new Tuple2<Long, Long>(6L, 60L));
+		List<Tuple2<Long, Long>> tuples = new ArrayList<>();
+		tuples.add(new Tuple2<>(1L, 10L));
+		tuples.add(new Tuple2<>(2L, 20L));
+		tuples.add(new Tuple2<>(3L, 30L));
+		tuples.add(new Tuple2<>(4L, 40L));
+		tuples.add(new Tuple2<>(6L, 60L));
 
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
+	public static DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
 			ExecutionEnvironment env) {
-		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
-		tuples.add(new Tuple2<Long, Long>(1L, 10L));
-		tuples.add(new Tuple2<Long, Long>(1L, 20L));
-		tuples.add(new Tuple2<Long, Long>(2L, 30L));
-		tuples.add(new Tuple2<Long, Long>(3L, 40L));
-		tuples.add(new Tuple2<Long, Long>(3L, 50L));
-		tuples.add(new Tuple2<Long, Long>(4L, 60L));
-		tuples.add(new Tuple2<Long, Long>(6L, 70L));
+		List<Tuple2<Long, Long>> tuples = new ArrayList<>();
+		tuples.add(new Tuple2<>(1L, 10L));
+		tuples.add(new Tuple2<>(1L, 20L));
+		tuples.add(new Tuple2<>(2L, 30L));
+		tuples.add(new Tuple2<>(3L, 40L));
+		tuples.add(new Tuple2<>(3L, 50L));
+		tuples.add(new Tuple2<>(4L, 60L));
+		tuples.add(new Tuple2<>(6L, 70L));
 
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
+	public static DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
 			ExecutionEnvironment env) {
-		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
-		tuples.add(new Tuple2<Long, Long>(2L, 10L));
-		tuples.add(new Tuple2<Long, Long>(3L, 20L));
-		tuples.add(new Tuple2<Long, Long>(3L, 30L));
-		tuples.add(new Tuple2<Long, Long>(4L, 40L));
-		tuples.add(new Tuple2<Long, Long>(6L, 50L));
-		tuples.add(new Tuple2<Long, Long>(6L, 60L));
-		tuples.add(new Tuple2<Long, Long>(1L, 70L));
+		List<Tuple2<Long, Long>> tuples = new ArrayList<>();
+		tuples.add(new Tuple2<>(2L, 10L));
+		tuples.add(new Tuple2<>(3L, 20L));
+		tuples.add(new Tuple2<>(3L, 30L));
+		tuples.add(new Tuple2<>(4L, 40L));
+		tuples.add(new Tuple2<>(6L, 50L));
+		tuples.add(new Tuple2<>(6L, 60L));
+		tuples.add(new Tuple2<>(1L, 70L));
 
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
+	public static DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
 			ExecutionEnvironment env) {
-		List<Tuple3<Long, Long, Long>> tuples = new ArrayList<Tuple3<Long, Long, Long>>();
-		tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
-		tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
-		tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
-		tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L));
-		tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L));
-		tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L));
-		tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L));
+		List<Tuple3<Long, Long, Long>> tuples = new ArrayList<>();
+		tuples.add(new Tuple3<>(1L, 2L, 12L));
+		tuples.add(new Tuple3<>(1L, 3L, 13L));
+		tuples.add(new Tuple3<>(2L, 3L, 23L));
+		tuples.add(new Tuple3<>(3L, 4L, 34L));
+		tuples.add(new Tuple3<>(3L, 6L, 36L));
+		tuples.add(new Tuple3<>(4L, 6L, 46L));
+		tuples.add(new Tuple3<>(6L, 1L, 61L));
 
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
+	public static DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
 			ExecutionEnvironment env) {
-		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
-				DummyCustomParameterizedType<Float>>>();
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
-				new DummyCustomParameterizedType<Float>(10, 10f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
-				new DummyCustomParameterizedType<Float>(20, 20f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
-				new DummyCustomParameterizedType<Float>(30, 30f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
-				new DummyCustomParameterizedType<Float>(40, 40f)));
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
+		tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(10, 10f)));
+		tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(20, 20f)));
+		tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(30, 30f)));
+		tuples.add(new Tuple2<>(4L, new DummyCustomParameterizedType<>(40, 40f)));
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
+	public static DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
 			ExecutionEnvironment env) {
-		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
-				DummyCustomParameterizedType<Float>>>();
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
-				new DummyCustomParameterizedType<Float>(10, 10f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
-				new DummyCustomParameterizedType<Float>(20, 20f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
-				new DummyCustomParameterizedType<Float>(30, 30f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
-				new DummyCustomParameterizedType<Float>(40, 40f)));
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
+		tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(10, 10f)));
+		tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(20, 20f)));
+		tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(30, 30f)));
+		tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(40, 40f)));
 
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
+	public static DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
 			ExecutionEnvironment env) {
-		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
-				DummyCustomParameterizedType<Float>>>();
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
-				new DummyCustomParameterizedType<Float>(10, 10f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
-				new DummyCustomParameterizedType<Float>(20, 20f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
-				new DummyCustomParameterizedType<Float>(30, 30f)));
-		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
-				new DummyCustomParameterizedType<Float>(40, 40f)));
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
+		tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(10, 10f)));
+		tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(20, 20f)));
+		tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(30, 30f)));
+		tuples.add(new Tuple2<>(4L, new DummyCustomParameterizedType<>(40, 40f)));
 
 		return env.fromCollection(tuples);
 	}
 
-	public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
+	public static DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
 			ExecutionEnvironment env) {
-		List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = 
-				new ArrayList<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>>();
-		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 2L,
-				new DummyCustomParameterizedType<Float>(10, 10f)));
-		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 3L,
-				new DummyCustomParameterizedType<Float>(20, 20f)));
-		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(2L, 3L,
-				new DummyCustomParameterizedType<Float>(30, 30f)));
-		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(3L, 4L,
-				new DummyCustomParameterizedType<Float>(40, 40f)));
+		List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
+		tuples.add(new Tuple3<>(1L, 2L, new DummyCustomParameterizedType<>(10, 10f)));
+		tuples.add(new Tuple3<>(1L, 3L, new DummyCustomParameterizedType<>(20, 20f)));
+		tuples.add(new Tuple3<>(2L, 3L, new DummyCustomParameterizedType<>(30, 30f)));
+		tuples.add(new Tuple3<>(3L, 4L, new DummyCustomParameterizedType<>(40, 40f)));
 
 		return env.fromCollection(tuples);
 	}
@@ -209,12 +189,12 @@ public class TestGraphUtils {
 	/**
 	 * A graph with invalid vertex ids
 	 */
-	public static final DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
+	public static DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
 			ExecutionEnvironment env) {
 		List<Vertex<Long, Long>> vertices = getLongLongVertices();
 
 		vertices.remove(0);
-		vertices.add(new Vertex<Long, Long>(15L, 1L));
+		vertices.add(new Vertex<>(15L, 1L));
 
 		return env.fromCollection(vertices);
 	}
@@ -222,15 +202,15 @@ public class TestGraphUtils {
 	/**
 	 * A graph that has at least one vertex with no ingoing/outgoing edges
 	 */
-	public static final DataSet<Edge<Long, Long>> getLongLongEdgeDataWithZeroDegree(
+	public static DataSet<Edge<Long, Long>> getLongLongEdgeDataWithZeroDegree(
 			ExecutionEnvironment env) {
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(1L, 2L, 12L));
-		edges.add(new Edge<Long, Long>(1L, 4L, 14L));
-		edges.add(new Edge<Long, Long>(1L, 5L, 15L));
-		edges.add(new Edge<Long, Long>(2L, 3L, 23L));
-		edges.add(new Edge<Long, Long>(3L, 5L, 35L));
-		edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+		List<Edge<Long, Long>> edges = new ArrayList<>();
+		edges.add(new Edge<>(1L, 2L, 12L));
+		edges.add(new Edge<>(1L, 4L, 14L));
+		edges.add(new Edge<>(1L, 5L, 15L));
+		edges.add(new Edge<>(2L, 3L, 23L));
+		edges.add(new Edge<>(3L, 5L, 35L));
+		edges.add(new Edge<>(4L, 5L, 45L));
 
 		return env.fromCollection(edges);
 	}
@@ -238,35 +218,34 @@ public class TestGraphUtils {
 	/**
 	 * Function that produces an ArrayList of vertices
 	 */
-	public static final List<Vertex<Long, Long>> getLongLongVertices() {
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		vertices.add(new Vertex<Long, Long>(1L, 1L));
-		vertices.add(new Vertex<Long, Long>(2L, 2L));
-		vertices.add(new Vertex<Long, Long>(3L, 3L));
-		vertices.add(new Vertex<Long, Long>(4L, 4L));
-		vertices.add(new Vertex<Long, Long>(5L, 5L));
+	public static List<Vertex<Long, Long>> getLongLongVertices() {
+		List<Vertex<Long, Long>> vertices = new ArrayList<>();
+		vertices.add(new Vertex<>(1L, 1L));
+		vertices.add(new Vertex<>(2L, 2L));
+		vertices.add(new Vertex<>(3L, 3L));
+		vertices.add(new Vertex<>(4L, 4L));
+		vertices.add(new Vertex<>(5L, 5L));
 
 		return vertices;
 	}
 
-	public static final List<Vertex<Long, Boolean>> getLongBooleanVertices() {
-		List<Vertex<Long, Boolean>> vertices = new ArrayList<Vertex<Long, Boolean>>();
-		vertices.add(new Vertex<Long, Boolean>(1L, true));
-		vertices.add(new Vertex<Long, Boolean>(2L, true));
-		vertices.add(new Vertex<Long, Boolean>(3L, true));
-		vertices.add(new Vertex<Long, Boolean>(4L, true));
-		vertices.add(new Vertex<Long, Boolean>(5L, true));
+	public static List<Vertex<Long, Boolean>> getLongBooleanVertices() {
+		List<Vertex<Long, Boolean>> vertices = new ArrayList<>();
+		vertices.add(new Vertex<>(1L, true));
+		vertices.add(new Vertex<>(2L, true));
+		vertices.add(new Vertex<>(3L, true));
+		vertices.add(new Vertex<>(4L, true));
+		vertices.add(new Vertex<>(5L, true));
 
 		return vertices;
 	}
 
-	public static final DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(
-				ExecutionEnvironment env) {
-			List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-			edges.add(new Edge<Long, Long>(1L, 2L, 12L));
-			edges.add(new Edge<Long, Long>(1L, 3L, 13L));
-			edges.add(new Edge<Long, Long>(2L, 3L, 23L));
-			edges.add(new Edge<Long, Long>(4L, 5L, 45L));
+	public static DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(ExecutionEnvironment env) {
+			List<Edge<Long, Long>> edges = new ArrayList<>();
+			edges.add(new Edge<>(1L, 2L, 12L));
+			edges.add(new Edge<>(1L, 3L, 13L));
+			edges.add(new Edge<>(2L, 3L, 23L));
+			edges.add(new Edge<>(4L, 5L, 45L));
 			
 			return env.fromCollection(edges);
 		}
@@ -274,15 +253,15 @@ public class TestGraphUtils {
 	/**
 	 * Function that produces an ArrayList of edges
 	 */
-	public static final List<Edge<Long, Long>> getLongLongEdges() {
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(1L, 2L, 12L));
-		edges.add(new Edge<Long, Long>(1L, 3L, 13L));
-		edges.add(new Edge<Long, Long>(2L, 3L, 23L));
-		edges.add(new Edge<Long, Long>(3L, 4L, 34L));
-		edges.add(new Edge<Long, Long>(3L, 5L, 35L));
-		edges.add(new Edge<Long, Long>(4L, 5L, 45L));
-		edges.add(new Edge<Long, Long>(5L, 1L, 51L));
+	public static List<Edge<Long, Long>> getLongLongEdges() {
+		List<Edge<Long, Long>> edges = new ArrayList<>();
+		edges.add(new Edge<>(1L, 2L, 12L));
+		edges.add(new Edge<>(1L, 3L, 13L));
+		edges.add(new Edge<>(2L, 3L, 23L));
+		edges.add(new Edge<>(3L, 4L, 34L));
+		edges.add(new Edge<>(3L, 5L, 35L));
+		edges.add(new Edge<>(4L, 5L, 45L));
+		edges.add(new Edge<>(5L, 1L, 51L));
 	
 		return edges;
 	}
@@ -373,45 +352,41 @@ public class TestGraphUtils {
 
 	/**
 	 * utils for getting the second graph for the test of method difference();
-	 * @param env
+	 * @param env - ExecutionEnvironment
 	 */
-	public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference(
-			ExecutionEnvironment env){
+	public static DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference(ExecutionEnvironment env) {
 		return env.fromCollection(getLongLongEdgesForDifference());
 	}
 
-	public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference2(
-			ExecutionEnvironment env){
+	public static DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference2(ExecutionEnvironment env) {
 		return env.fromCollection(getLongLongEdgesForDifference2());
 	}
 
-	public static final DataSet<Vertex<Long,Long>> getLongLongVertexDataDifference(
-			ExecutionEnvironment env)
-	{
+	public static DataSet<Vertex<Long,Long>> getLongLongVertexDataDifference(ExecutionEnvironment env) {
 		return env.fromCollection(getVerticesForDifference());
 	}
 
-	public static final List<Vertex<Long,Long>> getVerticesForDifference(){
-		List<Vertex<Long,Long>> vertices = new ArrayList<Vertex<Long,Long>>();
-		vertices.add(new Vertex<Long, Long>(1L, 1L));
-		vertices.add(new Vertex<Long, Long>(3L, 3L));
-		vertices.add(new Vertex<Long, Long>(6L, 6L));
+	public static List<Vertex<Long,Long>> getVerticesForDifference(){
+		List<Vertex<Long,Long>> vertices = new ArrayList<>();
+		vertices.add(new Vertex<>(1L, 1L));
+		vertices.add(new Vertex<>(3L, 3L));
+		vertices.add(new Vertex<>(6L, 6L));
 
 		return vertices;
 
 	}
 
-	public static final List<Edge<Long, Long>> getLongLongEdgesForDifference() {
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(1L, 3L, 13L));
-		edges.add(new Edge<Long, Long>(1L, 6L, 26L));
-		edges.add(new Edge<Long, Long>(6L, 3L, 63L));
+	public static List<Edge<Long, Long>> getLongLongEdgesForDifference() {
+		List<Edge<Long, Long>> edges = new ArrayList<>();
+		edges.add(new Edge<>(1L, 3L, 13L));
+		edges.add(new Edge<>(1L, 6L, 26L));
+		edges.add(new Edge<>(6L, 3L, 63L));
 		return edges;
 	}
 
-	public static final List<Edge<Long, Long>> getLongLongEdgesForDifference2() {
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
-		edges.add(new Edge<Long, Long>(6L, 6L, 66L));
+	public static List<Edge<Long, Long>> getLongLongEdgesForDifference2() {
+		List<Edge<Long, Long>> edges = new ArrayList<>();
+		edges.add(new Edge<>(6L, 6L, 66L));
 		return edges;
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 567b194..0820e6a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -75,7 +75,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				new UpdateFunction(), new MessageFunction(), 10, parameters);
 
 		DataSet<Vertex<Long,Long>> data = res.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
+		List<Vertex<Long,Long>> result= data.collect();
         
 		expectedResult = "1,11\n" +
 						"2,11\n" +
@@ -137,7 +137,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 
 		
 		DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
-        List<Tuple2<Long, Long>> result= data.collect();
+		List<Tuple2<Long, Long>> result= data.collect();
         
 		expectedResult = "1,6\n" +
 						"2,6\n" +
@@ -231,7 +231,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
 				.getVertices();
 
-        List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
+		List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
 
 		expectedResult = "1,[2, 3, 5]\n" +
 				"2,[1, 3]\n" +
@@ -264,7 +264,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
 				.getVertices();
 
-        List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
 
 		expectedResult = "1,[2, 3]\n" +
 				"2,[3]\n" +
@@ -297,7 +297,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
 				.getVertices();
 
-        List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
 
 		expectedResult = "1,[5]\n" +
 				"2,[1]\n" +
@@ -330,7 +330,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
 				.getVertices();
 
-        List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
+		List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
 
 		expectedResult = "1,[2, 3, 5]\n" +
 				"2,[1, 3]\n" +
@@ -356,7 +356,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
 				new DummyMessageFunction(), 2).getVertices();
 
-        List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
+		List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
 
 		expectedResult = "1,-1\n" +
 				"2,-1\n" +
@@ -388,7 +388,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
 
-        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
 		expectedResult = "1,1\n" +
 				"2,1\n" +
@@ -413,7 +413,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
 
-        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
 		expectedResult = "1,-1\n" +
 				"2,-1\n" +
@@ -445,7 +445,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
 
-        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
 		expectedResult = "1,2\n" +
 				"2,1\n" +
@@ -470,7 +470,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
 				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
 
-        List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
+		List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
 
 		expectedResult = "1,-1\n" +
 				"2,-1\n" +
@@ -502,7 +502,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runVertexCentricIteration(
 				new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
 
-        List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
+		List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
 
 		expectedResult = "1,true\n" +
 				"2,true\n" +
@@ -548,8 +548,6 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
 
-		LongSumAggregator aggregator = new LongSumAggregator();
-
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
 
@@ -643,11 +641,11 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public void sendMessages(Vertex<Long, Long> vertex) {
-			if (vertex.getId().equals(1)) {
+			if (vertex.getId() == 1) {
 				Assert.assertEquals(2, getOutDegree());
 				Assert.assertEquals(1, getInDegree());
 			}
-			else if(vertex.getId().equals(3)) {
+			else if(vertex.getId() == 3) {
 				Assert.assertEquals(2, getOutDegree());
 				Assert.assertEquals(2, getInDegree());
 			}
@@ -735,7 +733,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		@Override
 		public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
 			for (Edge<Long, Long> edge : getEdges()) {
-				if(edge.getSource() != vertex.getId()) {
+				if(!edge.getSource().equals(vertex.getId())) {
 					sendMessageTo(edge.getSource(), vertex.getId());
 				} else {
 					sendMessageTo(edge.getTarget(), vertex.getId());
@@ -759,7 +757,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		@Override
 		public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception {
 			for (Edge<Long, Long> edge : getEdges()) {
-				if(edge.getSource() != vertex.getId()) {
+				if(!edge.getSource().equals(vertex.getId())) {
 					sendMessageTo(edge.getSource(), vertex.getId());
 				} else {
 					sendMessageTo(edge.getTarget(), vertex.getId());
@@ -783,7 +781,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
 
 		public Long map(Vertex<Long, Long> value) {
-			return 1l;
+			return 1L;
 		}
 	}
 
@@ -792,7 +790,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
-			return new HashSet<Long>();
+			return new HashSet<>();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
index c19411b..a23bb61 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
@@ -92,7 +92,7 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
 		DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
 		DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
 		// the edge to be removed is a non-SP edge
-		Edge<Long, Double> edgeToBeRemoved = new Edge<Long, Double>(3L, 5L, 5.0);
+		Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
 
 		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
 		// Assumption: all minimum weight paths are kept

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
index 5aa9f26..8152885 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
@@ -82,7 +82,7 @@ public class MusicProfilesITCase extends MultipleProgramsTestBase {
 	public void after() throws Exception {
 		compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath);
 
-		ArrayList<String> list = new ArrayList<String>();
+		ArrayList<String> list = new ArrayList<>();
 		readAllResultLines(list, communitiesResultPath, new String[]{}, false);
 
 		String[] result = list.toArray(new String[list.size()]);

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
index 9eb7387..c8d85f0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -73,7 +73,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
 	public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
 		@Override
 		public Vertex<Long, Long> map(Long value) {
-			return new Vertex<Long, Long>(value, value);
+			return new Vertex<>(value, value);
 		}
 	}
 
@@ -87,7 +87,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
 	public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
 		public Edge<Long, NullValue> map(String value) {
 			String[] nums = value.split(" ");
-			return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
+			return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
 					NullValue.getInstance());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
index 94c7713..431ab70 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -41,8 +41,6 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String expectedResult;
-
 	@Test
 	public void testPageRankWithThreeIterations() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -53,7 +51,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
         List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
         		.collect();
         
-        compareWithDelta(result, expectedResult, 0.01);
+        compareWithDelta(result, 0.01);
 	}
 
 	@Test
@@ -66,7 +64,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
         List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
         		.collect();
         
-        compareWithDelta(result, expectedResult, 0.01);
+        compareWithDelta(result, 0.01);
 	}
 
 	@Test
@@ -79,7 +77,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
         List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
         		.collect();
         
-        compareWithDelta(result, expectedResult, 0.01);
+        compareWithDelta(result, 0.01);
 	}
 
 	@Test
@@ -92,18 +90,18 @@ public class PageRankITCase extends MultipleProgramsTestBase {
         List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
         		.collect();
         
-        compareWithDelta(result, expectedResult, 0.01);
+        compareWithDelta(result, 0.01);
 	}
 
 	private void compareWithDelta(List<Vertex<Long, Double>> result,
-			String expectedResult, double delta) {
+																double delta) {
 
 		String resultString = "";
         for (Vertex<Long, Double> v : result) {
         	resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
         }
-        
-		expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+
+		String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
 		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
 
 		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
index 1d9ab9f..15f59fe 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -34,8 +34,6 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class TriangleCountITCase extends MultipleProgramsTestBase {
 
-	private String expectedResult;
-
 	public TriangleCountITCase(TestExecutionMode mode) {
 		super(mode);
 	}
@@ -49,7 +47,7 @@ public class TriangleCountITCase extends MultipleProgramsTestBase {
 				env).getUndirected();
 
 		List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
-		expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
+		String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
 
 		Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index 20f4454..9995bad 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -124,7 +124,7 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		//env.fromElements(result).writeAsText(resultPath);
 		
 		String res= valid.toString();//env.fromElements(valid);
-        List<String> result= new LinkedList<String>();
+        List<String> result= new LinkedList<>();
         result.add(res);
 		expectedResult = "true";
 		
@@ -144,7 +144,7 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 		Boolean valid = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
 		
 		String res= valid.toString();//env.fromElements(valid);
-        List<String> result= new LinkedList<String>();
+        List<String> result= new LinkedList<>();
         result.add(res);
 
 		expectedResult = "false\n";
@@ -216,8 +216,7 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
 	private static final class AssignCustomVertexValueMapper implements
 		MapFunction<Long, DummyCustomParameterizedType<Double>> {
 
-		DummyCustomParameterizedType<Double> dummyValue =
-				new DummyCustomParameterizedType<Double>();
+		DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<>();
 
 		public DummyCustomParameterizedType<Double> map(Long vertexId) {
 			dummyValue.setIntField(vertexId.intValue()-1);

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
index 20cbca5..c78e6ba 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java
@@ -138,7 +138,7 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
 		public Tuple2<Long, Long> map(Long vertexId) {
-			return new Tuple2<Long, Long>(vertexId*2, 42l);
+			return new Tuple2<>(vertexId*2, 42L);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
index d6e5a9c..704a913 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java
@@ -39,8 +39,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-    private String expectedResult;
-
+	private String expectedResult;
 
 	@Test
 	public void testAddVertex() throws Exception {
@@ -53,10 +52,10 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L));
-        
+		graph = graph.addVertex(new Vertex<>(6L, 6L));
+
 		DataSet<Vertex<Long,Long>> data = graph.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
+		List<Vertex<Long,Long>> result = data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
@@ -64,7 +63,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,4\n" +
 				"5,5\n" +
 				"6,6\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -79,14 +78,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		vertices.add(new Vertex<Long, Long>(6L, 6L));
-		vertices.add(new Vertex<Long, Long>(7L, 7L));
+		List<Vertex<Long, Long>> vertices = new ArrayList<>();
+		vertices.add(new Vertex<>(6L, 6L));
+		vertices.add(new Vertex<>(7L, 7L));
 
 		graph = graph.addVertices(vertices);
 
 		DataSet<Vertex<Long,Long>> data = graph.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
+		List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
@@ -95,7 +94,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"5,5\n" +
 				"6,6\n" +
 				"7,7\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -109,17 +108,17 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L));
-		
+		graph = graph.addVertex(new Vertex<>(1L, 1L));
+
 		DataSet<Vertex<Long,Long>> data = graph.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
+		List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
 				"3,3\n" +
 				"4,4\n" +
 				"5,5\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -134,21 +133,21 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		vertices.add(new Vertex<Long, Long>(1L, 1L));
-		vertices.add(new Vertex<Long, Long>(3L, 3L));
+		List<Vertex<Long, Long>> vertices = new ArrayList<>();
+		vertices.add(new Vertex<>(1L, 1L));
+		vertices.add(new Vertex<>(3L, 3L));
 
 		graph = graph.addVertices(vertices);
 
 		DataSet<Vertex<Long,Long>> data = graph.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
+		List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
 				"3,3\n" +
 				"4,4\n" +
 				"5,5\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -163,14 +162,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		vertices.add(new Vertex<Long, Long>(1L, 1L));
-		vertices.add(new Vertex<Long, Long>(6L, 6L));
+		List<Vertex<Long, Long>> vertices = new ArrayList<>();
+		vertices.add(new Vertex<>(1L, 1L));
+		vertices.add(new Vertex<>(6L, 6L));
 
 		graph = graph.addVertices(vertices);
 
 		DataSet<Vertex<Long,Long>> data = graph.getVertices();
-        List<Vertex<Long,Long>> result= data.collect();
+		List<Vertex<Long,Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
@@ -178,7 +177,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,4\n" +
 				"5,5\n" +
 				"6,6\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -192,16 +191,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
+		graph = graph.removeVertex(new Vertex<>(5L, 5L));
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
 				"2,3,23\n" +
 				"3,4,34\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -216,19 +215,19 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
-		verticesToBeRemoved.add(new Vertex<Long, Long>(1L, 1L));
-		verticesToBeRemoved.add(new Vertex<Long, Long>(2L, 2L));
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
+		verticesToBeRemoved.add(new Vertex<>(1L, 1L));
+		verticesToBeRemoved.add(new Vertex<>(2L, 2L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -236,16 +235,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 	public void testRemoveInvalidVertex() throws Exception {
 		/*
 		 * Test removeVertex() -- remove an invalid vertex
-		 */	
-		
+		 */
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
+		graph = graph.removeVertex(new Vertex<>(6L, 6L));
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -254,7 +253,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -268,20 +267,20 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
-		verticesToBeRemoved.add(new Vertex<Long, Long>(1L, 1L));
-		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
+		verticesToBeRemoved.add(new Vertex<>(1L, 1L));
+		verticesToBeRemoved.add(new Vertex<>(7L, 7L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "2,3,23\n" +
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -295,14 +294,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
-		verticesToBeRemoved.add(new Vertex<Long, Long>(6L, 6L));
-		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
+		verticesToBeRemoved.add(new Vertex<>(6L, 6L));
+		verticesToBeRemoved.add(new Vertex<>(7L, 7L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -311,7 +310,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -325,39 +324,38 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
-		verticesToBeRemoved.add(new Vertex<Long, Long>(6L, 6L));
-		verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
+		List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
+		verticesToBeRemoved.add(new Vertex<>(6L, 6L));
+		verticesToBeRemoved.add(new Vertex<>(7L, 7L));
 
 		graph = graph.removeVertices(verticesToBeRemoved);
 
-        DataSet<Vertex<Long,Long>> data = graph.getVertices();
-        List<Vertex<Long, Long>> result= data.collect();
+		DataSet<Vertex<Long,Long>> data = graph.getVertices();
+		List<Vertex<Long, Long>> result= data.collect();
 
 		expectedResult = "1,1\n" +
 				"2,2\n" +
 				"3,3\n" +
 				"4,4\n" +
 				"5,5\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
-	
+
 	@Test
 	public void testAddEdge() throws Exception {
 		/*
 		 * Test addEdge() -- simple case
 		 */
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
-				61L);
+		graph = graph.addEdge(new Vertex<>(6L, 6L), new Vertex<>(1L, 1L), 61L);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -366,8 +364,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n" +
-				"6,1,61\n";	
-		
+				"6,1,61\n";
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -382,14 +380,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<Edge<Long, Long>>();
-		edgesToBeAdded.add(new Edge<Long, Long>(2L, 4L, 24L));
-		edgesToBeAdded.add(new Edge<Long, Long>(4L, 1L, 41L));
+		List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<>();
+		edgesToBeAdded.add(new Edge<>(2L, 4L, 24L));
+		edgesToBeAdded.add(new Edge<>(4L, 1L, 41L));
 
 		graph = graph.addEdges(edgesToBeAdded);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -400,7 +398,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"4,1,41\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -415,14 +413,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<Edge<Long, Long>>();
-		edgesToBeAdded.add(new Edge<Long, Long>(6L, 1L, 61L));
-		edgesToBeAdded.add(new Edge<Long, Long>(7L, 1L, 71L));
+		List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<>();
+		edgesToBeAdded.add(new Edge<>(6L, 1L, 61L));
+		edgesToBeAdded.add(new Edge<>(7L, 1L, 71L));
 
 		graph = graph.addEdges(edgesToBeAdded);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -431,7 +429,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -445,11 +443,11 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
+		graph = graph.addEdge(new Vertex<>(1L, 1L), new Vertex<>(2L, 2L),
 				12L);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,2,12\n" +
@@ -458,8 +456,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n" +
-				"5,1,51\n";	
-		
+				"5,1,51\n";
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -473,10 +471,10 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
+		graph = graph.removeEdge(new Edge<>(5L, 1L, 51L));
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -484,7 +482,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -498,21 +496,21 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
-		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
-		edgesToBeRemoved.add(new Edge<Long, Long>(2L, 3L, 23L));
+		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<>();
+		edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L));
+		edgesToBeRemoved.add(new Edge<>(2L, 3L, 23L));
 
 		graph = graph.removeEdges(edgesToBeRemoved);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -526,14 +524,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
-		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
-		edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
+		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<>();
+		edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L));
+		edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L));
 
 		graph = graph.removeEdges(edgesToBeRemoved);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -541,7 +539,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,4,34\n" +
 				"3,5,35\n" +
 				"4,5,45\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -552,13 +550,13 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 		 */
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
+
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
+		graph = graph.removeEdge(new Edge<>(6L, 1L, 61L));
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +
@@ -567,7 +565,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 				"3,5,35\n" +
 				"4,5,45\n" +
 				"5,1,51\n";
-		
+
 		compareResultAsTuples(result, expectedResult);
 	}
 
@@ -581,14 +579,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
 
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
-		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
-		edgesToBeRemoved.add(new Edge<Long, Long>(1L, 1L, 51L));
-		edgesToBeRemoved.add(new Edge<Long, Long>(6L, 1L, 61L));
+		List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<>();
+		edgesToBeRemoved.add(new Edge<>(1L, 1L, 51L));
+		edgesToBeRemoved.add(new Edge<>(6L, 1L, 61L));
 
 		graph = graph.removeEdges(edgesToBeRemoved);
 
-        DataSet<Edge<Long,Long>> data = graph.getEdges();
-        List<Edge<Long, Long>> result= data.collect();
+		DataSet<Edge<Long,Long>> data = graph.getEdges();
+		List<Edge<Long, Long>> result= data.collect();
 
 		expectedResult = "1,2,12\n" +
 				"1,3,13\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index ffc9da9..df37248 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -259,11 +259,11 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
-		List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
+		List<Vertex<Long, Long>> vertices = new ArrayList<>();
+		List<Edge<Long, Long>> edges = new ArrayList<>();
 
-		vertices.add(new Vertex<Long, Long>(6L, 6L));
-		edges.add(new Edge<Long, Long>(6L, 1L, 61L));
+		vertices.add(new Vertex<>(6L, 6L));
+		edges.add(new Edge<>(6L, 1L, 61L));
 
 		graph = graph.union(Graph.fromCollection(vertices, edges, env));
 
@@ -336,7 +336,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		DataSet<Vertex<Long, Long>> vertex = env.fromElements(new Vertex<Long, Long>(6L, 6L));
+		DataSet<Vertex<Long, Long>> vertex = env.fromElements(new Vertex<>(6L, 6L));
 
 		Graph<Long, Long, Long> graph2 = Graph.fromDataSet(vertex,TestGraphUtils.getLongLongEdgeDataDifference2(env),env);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
index e0bc35a..2fa3b8c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -473,8 +473,7 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
         public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
-                    edge.getTarget(), true);
+            return new Tuple3<>(edge.getSource(), edge.getTarget(), true);
         }
     }
 
@@ -512,28 +511,28 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
         public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
+            return new Tuple2<>(edge.getSource(), edge.getValue());
         }
     }
 
 	@SuppressWarnings("serial")
 	private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
         public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Boolean>(edge.getSource(), true);
+            return new Tuple2<>(edge.getSource(), true);
         }
     }
 
 	@SuppressWarnings("serial")
 	private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
         public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
+            return new Tuple2<>(edge.getTarget(), edge.getValue());
         }
     }
 
 	@SuppressWarnings("serial")
 	private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
         public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
-            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
+            return new Tuple2<>(edge.getTarget(), true);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
index 7a25788..5b77101 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -184,7 +184,7 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
         public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
-            return new Tuple2<Long, Boolean>(vertex.getId(), true);
+            return new Tuple2<>(vertex.getId(), true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
index 35f7b0e..5e751a5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
@@ -181,7 +181,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
 		public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
-			Tuple1<Long> tupleValue = new Tuple1<Long>();
+			Tuple1<Long> tupleValue = new Tuple1<>();
 			tupleValue.setFields(edge.getValue());
 			return tupleValue;
 		}
@@ -201,7 +201,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
 		DummyCustomParameterizedType<Double>> {
 
 		public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
-			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<>();
 			dummyValue.setIntField(edge.getValue().intValue());
 			dummyValue.setTField(new Double(edge.getValue()));						
 			return dummyValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
index 677a03c..108be3e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
@@ -190,7 +190,7 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
 		public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
-			Tuple1<Long> tupleValue = new Tuple1<Long>();
+			Tuple1<Long> tupleValue = new Tuple1<>();
 			tupleValue.setFields(vertex.getValue());
 			return tupleValue;
 		}
@@ -210,7 +210,7 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
 		DummyCustomParameterizedType<Double>> {
 		
 		public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
-			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<>();
 			dummyValue.setIntField(vertex.getValue().intValue());
 			dummyValue.setTField(new Double(vertex.getValue()));						
 			return dummyValue;

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index 3bb19fa..3978d36 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.test.operations;
 
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -414,7 +415,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 					minNeighborId = edge.getTarget();
 				}
 			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
+			out.collect(new Tuple2<>(v.getId(), minNeighborId));
 		}
 	}
 
@@ -432,7 +433,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 					weight = edge.getValue();
 				}
 			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), weight));
+			out.collect(new Tuple2<>(v.getId(), weight));
 		}
 	}
 
@@ -470,7 +471,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 					minNeighborId = edge.getSource();
 				}
 			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
+			out.collect(new Tuple2<>(v.getId(), minNeighborId));
 		}
 	}
 
@@ -482,7 +483,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 								 Collector<Tuple2<Long, Long>> out) throws Exception {
 
 			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+				out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
 			}
 		}
 	}
@@ -496,7 +497,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 
 			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
 				if(edge.f0 != 5) {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+					out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
 				}
 			}
 		}
@@ -511,7 +512,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 								 Collector<Tuple2<Long, Long>> out) throws Exception {
 			for (Edge<Long, Long> edge: edges) {
 				if(v.getValue() > 2) {
-					out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+					out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
 				}
 			}
 		}
@@ -525,7 +526,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 								 Collector<Tuple2<Long, Long>> out) throws Exception {
 
 			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+				out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
 			}
 		}
 	}
@@ -539,7 +540,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 
 			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
 				if(edge.f0 != 5) {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+					out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
 				}
 			}
 		}
@@ -554,7 +555,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 								 Collector<Tuple2<Long, Long>> out) throws Exception {
 			for (Edge<Long, Long> edge: edges) {
 				if(v.getValue() > 2) {
-					out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+					out.collect(new Tuple2<>(v.getId(), edge.getSource()));
 				}
 			}
 		}
@@ -567,10 +568,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
 								 Collector<Tuple2<Long, Long>> out) throws Exception {
 			for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
-				if (edge.f0 == edge.f1.getTarget()) {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+				if (Objects.equals(edge.f0, edge.f1.getTarget())) {
+					out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
 				} else {
-					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+					out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
 				}
 			}
 		}
@@ -584,10 +585,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 								 Collector<Tuple2<Long, Long>> out) throws Exception {
 			for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
 				if(edge.f0 != 5 && edge.f0 != 2) {
-					if (edge.f0 == edge.f1.getTarget()) {
-						out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+					if (Objects.equals(edge.f0, edge.f1.getTarget())) {
+						out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
 					} else {
-						out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+						out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
 					}
 				}
 			}
@@ -604,9 +605,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 			for(Edge<Long, Long> edge : edges) {
 				if(v.getValue() > 4) {
 					if(v.getId().equals(edge.getTarget())) {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+						out.collect(new Tuple2<>(v.getId(), edge.getSource()));
 					} else {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+						out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index ab10947..61ef446 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -131,9 +131,9 @@ public class ReduceOnEdgesWithExceptionITCase {
 			for(Edge<Long, Long> edge : edges) {
 				if(v.getValue() > 4) {
 					if(v.getId().equals(edge.getTarget())) {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+						out.collect(new Tuple2<>(v.getId(), edge.getSource()));
 					} else {
-						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+						out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index 7553b32..90e3342 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -420,7 +420,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			out.collect(new Tuple2<>(vertex.getId(), sum));
 		}
 	}
 
@@ -437,7 +437,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			out.collect(new Tuple2<>(vertex.getId(), sum));
 		}
 	}
 
@@ -454,7 +454,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+			out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
 		}
 	}
 
@@ -472,7 +472,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 					sum += neighbor.f1.getValue();
 			}
 			if(vertex.getId() > 3) {
-				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+				out.collect(new Tuple2<>(vertex.getId(), sum));
 			}
 		}
 	}
@@ -491,7 +491,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
 			}
 			if(vertex.getId() > 3) {
-				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+				out.collect(new Tuple2<>(vertex.getId(), sum));
 			}
 		}
 	}
@@ -510,7 +510,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				sum += neighbor.f1.getValue();
 			}
 			if(vertex.getId() > 3) {
-				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+				out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
 			}
 		}
 	}
@@ -533,13 +533,11 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 									 Collector<Tuple2<Long, Long>> out) throws Exception {
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
+			for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				next = neighbor;
 				sum += next.f2.getValue() * next.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(next.f0, sum));
+			out.collect(new Tuple2<>(next.f0, sum));
 		}
 	}
 
@@ -553,15 +551,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
+			for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				next = neighbor;
 				sum += next.f2.getValue();
 			}
 			if(next.f0 > 2) {
-				out.collect(new Tuple2<Long, Long>(next.f0, sum));
-				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+				out.collect(new Tuple2<>(next.f0, sum));
+				out.collect(new Tuple2<>(next.f0, sum * 2));
 			}
 		}
 	}
@@ -576,15 +572,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
+			for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				next = neighbor;
 				sum += next.f2.getValue() * next.f1.getValue();
 			}
 			if(next.f0 > 2) {
-				out.collect(new Tuple2<Long, Long>(next.f0, sum));
-				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+				out.collect(new Tuple2<>(next.f0, sum));
+				out.collect(new Tuple2<>(next.f0, sum * 2));
 			}
 		}
 	}
@@ -599,15 +593,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
-			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
-					neighbors.iterator();
-			while(neighborsIterator.hasNext()) {
-				next = neighborsIterator.next();
+			for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				next = neighbor;
 				sum += next.f2.getValue();
 			}
 			if(next.f0 > 2) {
-				out.collect(new Tuple2<Long, Long>(next.f0, sum));
-				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+				out.collect(new Tuple2<>(next.f0, sum));
+				out.collect(new Tuple2<>(next.f0, sum * 2));
 			}
 		}
 	}
@@ -625,8 +617,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
+			out.collect(new Tuple2<>(vertex.getId(), sum));
+			out.collect(new Tuple2<>(vertex.getId(), sum * 2));
 		}
 	}
 
@@ -643,8 +635,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
+			out.collect(new Tuple2<>(vertex.getId(), sum));
+			out.collect(new Tuple2<>(vertex.getId(), sum - 1));
 		}
 	}
 
@@ -661,8 +653,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
+			out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
+			out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue() + 5));
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/3a300e63/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index b32abeb..6cc0b6a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -188,7 +188,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+			out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
 		}
 	}