You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/05 17:16:54 UTC

[2/4] flink git commit: [FLINK-4729] [gelly] Use optional VertexCentric CombineFunction

[FLINK-4729] [gelly] Use optional VertexCentric CombineFunction

Passes through the CombineFunction to VertexCentricIteration, and other
code cleanup discovered via IntelliJ's code analyzer.

This closes #2587


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb34133e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb34133e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb34133e

Branch: refs/heads/master
Commit: bb34133ed30b2a6c74baa0e5342e278e039cbe2a
Parents: 8c7c42f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Oct 3 13:59:38 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Oct 5 12:26:16 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Edge.java  |   2 +-
 .../main/java/org/apache/flink/graph/Graph.java | 165 +++++++++----------
 .../org/apache/flink/graph/GraphAlgorithm.java  |   2 +-
 .../org/apache/flink/graph/GraphCsvReader.java  |  10 +-
 .../flink/graph/IterationConfiguration.java     |   2 +-
 .../java/org/apache/flink/graph/Triplet.java    |   6 +-
 .../apache/flink/graph/VertexJoinFunction.java  |   5 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   |   6 +-
 .../flink/graph/gsa/GSAConfiguration.java       |   6 +-
 .../apache/flink/graph/gsa/GatherFunction.java  |   6 +-
 .../graph/gsa/GatherSumApplyIteration.java      |  29 ++--
 .../org/apache/flink/graph/gsa/SumFunction.java |   6 +-
 .../library/GSASingleSourceShortestPaths.java   |   6 +-
 .../flink/graph/library/LabelPropagation.java   |   2 +-
 .../library/SingleSourceShortestPaths.java      |   2 +-
 .../flink/graph/library/TriangleEnumerator.java |   2 +-
 .../flink/graph/pregel/ComputeFunction.java     |  24 +--
 .../flink/graph/pregel/MessageCombiner.java     |  13 +-
 .../pregel/VertexCentricConfiguration.java      |   2 +-
 .../graph/pregel/VertexCentricIteration.java    |  45 +++--
 .../flink/graph/spargel/GatherFunction.java     |  10 +-
 .../flink/graph/spargel/ScatterFunction.java    |  12 +-
 .../graph/spargel/ScatterGatherIteration.java   |   5 +-
 .../validation/InvalidVertexIdsValidator.java   |   6 +-
 .../graph/asm/translate/TranslateTest.java      |   1 -
 .../test/GatherSumApplyConfigurationITCase.java |  33 ++--
 .../test/ScatterGatherConfigurationITCase.java  |  17 +-
 .../operations/GraphCreationWithCsvITCase.java  |   5 +-
 .../test/operations/GraphOperationsITCase.java  |  55 +++----
 .../ReduceOnNeighborMethodsITCase.java          |   5 +-
 30 files changed, 235 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index d84badb..2bcce29 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -46,7 +46,7 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
 	 * and the target is the original Edge's source.
 	 */
 	public Edge<K, V> reverse() {
-			return new Edge<K, V>(this.f1, this.f0, this.f2);
+			return new Edge<>(this.f1, this.f0, this.f2);
 	}
 
 	public void setSource(K src) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 821b0a7..02d1eeb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.Collector;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -160,7 +161,7 @@ public class Graph<K, VV, EV> {
 	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
 			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
 
-		return new Graph<K, VV, EV>(vertices, edges, context);
+		return new Graph<>(vertices, edges, context);
 	}
 
 	/**
@@ -177,15 +178,15 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
 
-		return new Graph<K, NullValue, EV>(vertices, edges, context);
+		return new Graph<>(vertices, edges, context);
 	}
 
 	private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
 			Edge<K, EV>, Vertex<K, NullValue>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
-			out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
-			out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
+			out.collect(new Vertex<>(edge.f0, NullValue.getInstance()));
+			out.collect(new Vertex<>(edge.f1, NullValue.getInstance()));
 		}
 	}
 
@@ -216,19 +217,19 @@ public class Graph<K, VV, EV> {
 				.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
 				.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
 					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
-						return new Vertex<K, VV>(value.f0, vertexValueInitializer.map(value.f0));
+						return new Vertex<>(value.f0, vertexValueInitializer.map(value.f0));
 					}
 				}).returns(returnType).withForwardedFields("f0");
 
-		return new Graph<K, VV, EV>(vertices, edges, context);
+		return new Graph<>(vertices, edges, context);
 	}
 
 	private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
 		Edge<K, EV>, Tuple1<K>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-			out.collect(new Tuple1<K>(edge.f0));
-			out.collect(new Tuple1<K>(edge.f1));
+			out.collect(new Tuple1<>(edge.f0));
+			out.collect(new Tuple1<>(edge.f1));
 		}
 	}
 
@@ -316,7 +317,7 @@ public class Graph<K, VV, EV> {
 				new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
 
 					public Edge<K, NullValue> map(Tuple2<K, K> input) {
-						return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+						return new Edge<>(input.f0, input.f1, NullValue.getInstance());
 					}
 		}).withForwardedFields("f0; f1");
 		return fromDataSet(edgeDataSet, context);
@@ -344,7 +345,7 @@ public class Graph<K, VV, EV> {
 				new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
 
 					public Edge<K, NullValue> map(Tuple2<K, K> input) {
-						return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+						return new Edge<>(input.f0, input.f1, NullValue.getInstance());
 					}
 				}).withForwardedFields("f0; f1");
 		return fromDataSet(edgeDataSet, vertexValueInitializer, context);
@@ -472,7 +473,7 @@ public class Graph<K, VV, EV> {
 		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
 				throws Exception {
 
-			collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
+			collector.collect(new Tuple4<>(edge.getSource(), edge.getTarget(), vertex.getValue(),
 					edge.getValue()));
 		}
 	}
@@ -486,7 +487,7 @@ public class Graph<K, VV, EV> {
 		public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
 						Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
 
-			collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
+			collector.collect(new Triplet<>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
 					tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
 		}
 	}
@@ -521,13 +522,13 @@ public class Graph<K, VV, EV> {
 		DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
 				new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
 					public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
-						return new Vertex<K, NV>(value.f0, mapper.map(value));
+						return new Vertex<>(value.f0, mapper.map(value));
 					}
 				})
 				.returns(returnType)
 				.withForwardedFields("f0");
 
-		return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
+		return new Graph<>(mappedVertices, this.edges, this.context);
 	}
 
 	/**
@@ -596,14 +597,13 @@ public class Graph<K, VV, EV> {
 		DataSet<Edge<K, NV>> mappedEdges = edges.map(
 				new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
 					public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
-						return new Edge<K, NV>(value.f0, value.f1, mapper
-								.map(value));
+						return new Edge<>(value.f0, value.f1, mapper.map(value));
 					}
 				})
 				.returns(returnType)
 				.withForwardedFields("f0; f1");
 
-		return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
+		return new Graph<>(this.vertices, mappedEdges, this.context);
 	}
 
 	/**
@@ -628,7 +628,7 @@ public class Graph<K, VV, EV> {
 		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
 				.coGroup(inputDataSet).where(0).equalTo(0)
 				.with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction));
-		return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
+		return new Graph<>(resultedVertices, this.edges, this.context);
 	}
 
 	private static final class ApplyCoGroupToVertexValues<K, VV, T>
@@ -651,7 +651,7 @@ public class Graph<K, VV, EV> {
 				if (inputIterator.hasNext()) {
 					final Tuple2<K, T> inputNext = inputIterator.next();
 
-					collector.collect(new Vertex<K, VV>(inputNext.f0, vertexJoinFunction
+					collector.collect(new Vertex<>(inputNext.f0, vertexJoinFunction
 							.vertexJoin(vertexIterator.next().f1, inputNext.f1)));
 				} else {
 					collector.collect(vertexIterator.next());
@@ -681,7 +681,7 @@ public class Graph<K, VV, EV> {
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
 				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction));
-		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+		return new Graph<>(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValues<K, EV, T>
@@ -704,7 +704,7 @@ public class Graph<K, VV, EV> {
 				if (inputIterator.hasNext()) {
 					final Tuple3<K, K, T> inputNext = inputIterator.next();
 
-					collector.collect(new Edge<K, EV>(inputNext.f0,
+					collector.collect(new Edge<>(inputNext.f0,
 							inputNext.f1, edgeJoinFunction.edgeJoin(
 									edgesIterator.next().f2, inputNext.f2)));
 				} else {
@@ -736,7 +736,7 @@ public class Graph<K, VV, EV> {
 				.coGroup(inputDataSet).where(0).equalTo(0)
 				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
 
-		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+		return new Graph<>(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
@@ -761,7 +761,7 @@ public class Graph<K, VV, EV> {
 				while (edgesIterator.hasNext()) {
 					Edge<K, EV> edgesNext = edgesIterator.next();
 
-					collector.collect(new Edge<K, EV>(edgesNext.f0,
+					collector.collect(new Edge<>(edgesNext.f0,
 							edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
 				}
 
@@ -795,7 +795,7 @@ public class Graph<K, VV, EV> {
 				.coGroup(inputDataSet).where(1).equalTo(0)
 				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
 
-		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
+		return new Graph<>(this.vertices, resultedEdges, this.context);
 	}
 
 	/**
@@ -817,8 +817,7 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
 
-		return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
-				this.context);
+		return new Graph<>(filteredVertices, filteredEdges, this.context);
 	}
 
 	/**
@@ -837,7 +836,7 @@ public class Graph<K, VV, EV> {
 				.join(filteredVertices).where(1).equalTo(0)
 				.with(new ProjectEdge<K, VV, EV>());
 
-		return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context);
+		return new Graph<>(filteredVertices, remainingEdges, this.context);
 	}
 
 	/**
@@ -850,7 +849,7 @@ public class Graph<K, VV, EV> {
 	public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
 		DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
 
-		return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
+		return new Graph<>(this.vertices, filteredEdges, this.context);
 	}
 
 	@ForwardedFieldsFirst("f0; f1; f2")
@@ -924,7 +923,7 @@ public class Graph<K, VV, EV> {
 	public Graph<K, VV, EV> getUndirected() {
 
 		DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
-		return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context);
+		return new Graph<>(vertices, undirectedEdges, this.context);
 	}
 
 	/**
@@ -947,13 +946,13 @@ public class Graph<K, VV, EV> {
 		switch (direction) {
 		case IN:
 			return vertices.coGroup(edges).where(0).equalTo(1)
-					.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
+					.with(new ApplyCoGroupFunction<>(edgesFunction));
 		case OUT:
 			return vertices.coGroup(edges).where(0).equalTo(0)
-					.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
+					.with(new ApplyCoGroupFunction<>(edgesFunction));
 		case ALL:
 			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
-					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
+					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction));
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -980,13 +979,13 @@ public class Graph<K, VV, EV> {
 		switch (direction) {
 			case IN:
 				return vertices.coGroup(edges).where(0).equalTo(1)
-						.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+						.with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
 			case OUT:
 				return vertices.coGroup(edges).where(0).equalTo(0)
-						.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+						.with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
 			case ALL:
 				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
-						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1013,14 +1012,14 @@ public class Graph<K, VV, EV> {
 		case IN:
 			return edges.map(new ProjectVertexIdMap<K, EV>(1))
 					.withForwardedFields("f1->f0")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
 		case OUT:
 			return edges.map(new ProjectVertexIdMap<K, EV>(0))
 					.withForwardedFields("f0")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
 		case ALL:
 			return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction));
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1048,14 +1047,14 @@ public class Graph<K, VV, EV> {
 			case IN:
 				return edges.map(new ProjectVertexIdMap<K, EV>(1))
 						.withForwardedFields("f1->f0")
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
 			case OUT:
 				return edges.map(new ProjectVertexIdMap<K, EV>(0))
 						.withForwardedFields("f0")
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
 			case ALL:
 				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1072,7 +1071,7 @@ public class Graph<K, VV, EV> {
 
 		@SuppressWarnings("unchecked")
 		public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
-			return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition),	edge);
+			return new Tuple2<>((K) edge.getField(fieldPosition), edge);
 		}
 	}
 
@@ -1087,7 +1086,7 @@ public class Graph<K, VV, EV> {
 
 		@SuppressWarnings("unchecked")
 		public Tuple2<K, EV> map(Edge<K, EV> edge) {
-			return new Tuple2<K, EV>((K) edge.getField(fieldPosition),	edge.getValue());
+			return new Tuple2<>((K) edge.getField(fieldPosition), edge.getValue());
 		}
 	}
 
@@ -1114,8 +1113,8 @@ public class Graph<K, VV, EV> {
 		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
-			out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
-			out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
+			out.collect(new Tuple2<>(edge.getSource(), edge));
+			out.collect(new Tuple2<>(edge.getTarget(), edge));
 		}
 	}
 
@@ -1123,8 +1122,8 @@ public class Graph<K, VV, EV> {
 		Edge<K, EV>, Tuple2<K, EV>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
-			out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
-			out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
+			out.collect(new Tuple2<>(edge.getSource(), edge.getValue()));
+			out.collect(new Tuple2<>(edge.getTarget(), edge.getValue()));
 		}
 	}
 
@@ -1132,8 +1131,8 @@ public class Graph<K, VV, EV> {
 		Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
 
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
-			out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
-			out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
+			out.collect(new Tuple3<>(edge.getSource(), edge.getTarget(), edge));
+			out.collect(new Tuple3<>(edge.getTarget(), edge.getSource(), edge));
 		}
 	}
 
@@ -1224,7 +1223,7 @@ public class Graph<K, VV, EV> {
 			implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
 
 		public Edge<K, EV> map(Edge<K, EV> value) {
-			return new Edge<K, EV>(value.f1, value.f0, value.f2);
+			return new Edge<>(value.f1, value.f0, value.f2);
 		}
 	}
 
@@ -1233,8 +1232,8 @@ public class Graph<K, VV, EV> {
 
 		@Override
 		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
-			out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2));
-			out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2));
+			out.collect(new Edge<>(edge.f0, edge.f1, edge.f2));
+			out.collect(new Edge<>(edge.f1, edge.f0, edge.f2));
 		}
 	}
 
@@ -1246,7 +1245,7 @@ public class Graph<K, VV, EV> {
 	 */
 	public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
 		DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
-		return new Graph<K, VV, EV>(vertices, reversedEdges, this.context);
+		return new Graph<>(vertices, reversedEdges, this.context);
 	}
 
 	/**
@@ -1290,7 +1289,7 @@ public class Graph<K, VV, EV> {
 			implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
 		@Override
 		public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
-			return new Tuple2<K, K>(edge.f0, edge.f1);
+			return new Tuple2<>(edge.f0, edge.f1);
 		}
 	}
 
@@ -1302,7 +1301,7 @@ public class Graph<K, VV, EV> {
 	 * @return the new graph containing the existing vertices as well as the one just added
 	 */
 	public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
-		List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
+		List<Vertex<K, VV>> newVertex = new ArrayList<>();
 		newVertex.add(vertex);
 
 		return addVertices(newVertex);
@@ -1353,7 +1352,7 @@ public class Graph<K, VV, EV> {
 	 */
 	public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
 		Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
-				Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
+				Collections.singletonList(new Edge<>(source.f0, target.f0, edgeValue)),
 				this.context);
 		return this.union(partialGraph);
 	}
@@ -1408,7 +1407,7 @@ public class Graph<K, VV, EV> {
 	 */
 	public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
 
-		List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>();
+		List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<>();
 		vertexToBeRemoved.add(vertex);
 
 		return removeVertices(vertexToBeRemoved);
@@ -1445,7 +1444,7 @@ public class Graph<K, VV, EV> {
 				.join(newVertices).where(1).equalTo(0)
 				.with(new ProjectEdge<K, VV, EV>());
 
-		return new Graph<K, VV, EV>(newVertices, newEdges, context);
+		return new Graph<>(newVertices, newEdges, context);
 	}
 
 	private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
@@ -1515,7 +1514,7 @@ public class Graph<K, VV, EV> {
 		DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
 				.where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
 
-		return new Graph<K, VV, EV>(this.vertices, newEdges, context);
+		return new Graph<>(this.vertices, newEdges, context);
 	}
 
 	private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
@@ -1541,7 +1540,7 @@ public class Graph<K, VV, EV> {
 	public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
 		DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
 		DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
-		return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
+		return new Graph<>(unionedVertices, unionedEdges, this.context);
 	}
 
 	/**
@@ -1689,7 +1688,7 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
 
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+		return new Graph<>(newVertices, this.edges, this.context);
 	}
 
 	/**
@@ -1706,7 +1705,7 @@ public class Graph<K, VV, EV> {
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-			org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction,
+			org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
 			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
 
 		return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
@@ -1727,7 +1726,7 @@ public class Graph<K, VV, EV> {
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-			org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction,
+			org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
 			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
 			GSAConfiguration parameters) {
 
@@ -1738,7 +1737,7 @@ public class Graph<K, VV, EV> {
 
 		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
 
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+		return new Graph<>(newVertices, this.edges, this.context);
 	}
 
 	/**
@@ -1776,10 +1775,10 @@ public class Graph<K, VV, EV> {
 			VertexCentricConfiguration parameters) {
 
 		VertexCentricIteration<K, VV, EV, M> iteration = VertexCentricIteration.withEdges(
-				edges, computeFunction, maximumNumberOfIterations);
+				edges, computeFunction, combiner, maximumNumberOfIterations);
 		iteration.configure(parameters);
 		DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+		return new Graph<>(newVertices, this.edges, this.context);
 	}
 
 	/**
@@ -1831,14 +1830,14 @@ public class Graph<K, VV, EV> {
 					.join(this.vertices).where(0).equalTo(0);
 			return vertices.coGroup(edgesWithSources)
 					.where(0).equalTo("f0.f1")
-					.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+					.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
 		case OUT:
 			// create <edge-targetVertex> pairs
 			DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
 					.join(this.vertices).where(1).equalTo(0);
 			return vertices.coGroup(edgesWithTargets)
 					.where(0).equalTo("f0.f0")
-					.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+					.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction));
 		case ALL:
 			// create <edge-sourceOrTargetVertex> pairs
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1848,7 +1847,7 @@ public class Graph<K, VV, EV> {
 
 			return vertices.coGroup(edgesWithNeighbors)
 					.where(0).equalTo(0)
-					.with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
+					.with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction));
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1879,14 +1878,14 @@ public class Graph<K, VV, EV> {
 						.join(this.vertices).where(0).equalTo(0);
 				return vertices.coGroup(edgesWithSources)
 						.where(0).equalTo("f0.f1")
-						.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+						.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
 			case OUT:
 				// create <edge-targetVertex> pairs
 				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
 						.join(this.vertices).where(1).equalTo(0);
 				return vertices.coGroup(edgesWithTargets)
 						.where(0).equalTo("f0.f0")
-						.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+						.with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
 			case ALL:
 				// create <edge-sourceOrTargetVertex> pairs
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1896,7 +1895,7 @@ public class Graph<K, VV, EV> {
 
 				return vertices.coGroup(edgesWithNeighbors)
 						.where(0).equalTo(0)
-						.with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+						.with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1927,7 +1926,7 @@ public class Graph<K, VV, EV> {
 					.with(new ProjectVertexIdJoin<K, VV, EV>(1))
 					.withForwardedFieldsFirst("f1->f0");
 			return edgesWithSources.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+				new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
 		case OUT:
 			// create <edge-targetVertex> pairs
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
@@ -1935,7 +1934,7 @@ public class Graph<K, VV, EV> {
 					.with(new ProjectVertexIdJoin<K, VV, EV>(0))
 					.withForwardedFieldsFirst("f0");
 			return edgesWithTargets.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+				new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
 		case ALL:
 			// create <edge-sourceOrTargetVertex> pairs
 			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1944,7 +1943,7 @@ public class Graph<K, VV, EV> {
 					.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
 
 			return edgesWithNeighbors.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
+				new ApplyNeighborGroupReduceFunction<>(neighborsFunction));
 		default:
 			throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -1976,7 +1975,7 @@ public class Graph<K, VV, EV> {
 						.with(new ProjectVertexIdJoin<K, VV, EV>(1))
 						.withForwardedFieldsFirst("f1->f0");
 				return edgesWithSources.groupBy(0).reduceGroup(
-						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+					new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
 			case OUT:
 				// create <edge-targetVertex> pairs
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
@@ -1984,7 +1983,7 @@ public class Graph<K, VV, EV> {
 						.with(new ProjectVertexIdJoin<K, VV, EV>(0))
 						.withForwardedFieldsFirst("f0");
 				return edgesWithTargets.groupBy(0).reduceGroup(
-						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+					new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
 			case ALL:
 				// create <edge-sourceOrTargetVertex> pairs
 				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
@@ -1993,7 +1992,7 @@ public class Graph<K, VV, EV> {
 						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
 
 				return edgesWithNeighbors.groupBy(0).reduceGroup(
-						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
+					new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
@@ -2031,7 +2030,7 @@ public class Graph<K, VV, EV> {
 		@SuppressWarnings("unchecked")
 		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, 
 				Collector<Tuple2<K, VV>> out) {
-			out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
+			out.collect(new Tuple2<>((K) edge.getField(fieldPosition), otherVertex.getValue()));
 		}
 	}
 
@@ -2047,7 +2046,7 @@ public class Graph<K, VV, EV> {
 		@SuppressWarnings("unchecked")
 		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
 						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
-			out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
+			out.collect(new Tuple3<>((K) edge.getField(fieldPosition), edge, otherVertex));
 		}
 	}
 
@@ -2059,7 +2058,7 @@ public class Graph<K, VV, EV> {
 		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
 				Collector<Tuple2<K, VV>> out) {
 
-			out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
+			out.collect(new Tuple2<>(keysWithEdge.f0, neighbor.getValue()));
 		}
 	}
 
@@ -2070,7 +2069,7 @@ public class Graph<K, VV, EV> {
 
 		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
 						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
-			out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
+			out.collect(new Tuple3<>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
 		}
 	}
 
@@ -2119,7 +2118,7 @@ public class Graph<K, VV, EV> {
 				@Override
 				public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() {
 					Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next();
-					return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2);
+					return new Tuple2<>(next.f1, next.f2);
 				}
 
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index 08cf011..8ec9650 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -26,5 +26,5 @@ package org.apache.flink.graph;
  */
 public interface GraphAlgorithm<K, VV, EV, T> {
 
-	public T run(Graph<K, VV, EV> input) throws Exception;
+	T run(Graph<K, VV, EV> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index f93abc4..4859e12 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -104,7 +104,7 @@ public class GraphCsvReader {
 	public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue,
 			Class<EV> edgeValue) {
 
-		DataSet<Tuple2<K, VV>> vertices = null;
+		DataSet<Tuple2<K, VV>> vertices;
 
 		if (edgeReader == null) {
 			throw new RuntimeException("The edges input file cannot be null!");
@@ -160,9 +160,9 @@ public class GraphCsvReader {
 					private static final long serialVersionUID = -2981792951286476970L;
 
 					public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) {
-						return new Tuple3<K, K, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+						return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
 					}
-				}).withForwardedFields("f0;f1");;
+				}).withForwardedFields("f0;f1");
 
 		return Graph.fromTupleDataSet(edges, executionContext);
 	}
@@ -179,7 +179,7 @@ public class GraphCsvReader {
 	@SuppressWarnings({ "serial", "unchecked" })
 	public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) {
 		
-		DataSet<Tuple2<K, VV>> vertices = null;
+		DataSet<Tuple2<K, VV>> vertices;
 
 		if (edgeReader == null) {
 			throw new RuntimeException("The edges input file cannot be null!");
@@ -189,7 +189,7 @@ public class GraphCsvReader {
 				.map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() {
 
 					public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) {
-						return new Tuple3<K, K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+						return new Tuple3<>(input.f0, input.f1, NullValue.getInstance());
 					}
 				}).withForwardedFields("f0;f1");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 0b98a27..964d20e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -37,7 +37,7 @@ public abstract class IterationConfiguration {
 	private int parallelism = -1;
 
 	/** the iteration aggregators **/
-	private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>();
+	private Map<String, Aggregator<?>> aggregators = new HashMap<>();
 
 	/** flag that defines whether the solution set is kept in managed memory **/
 	private boolean unmanagedSolutionSet = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index dee3480..2ae0903 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -64,14 +64,14 @@ public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
 	}
 
 	public Vertex<K, VV> getSrcVertex() {
-		return new Vertex<K, VV>(this.f0, this.f2);
+		return new Vertex<>(this.f0, this.f2);
 	}
 
 	public Vertex<K, VV> getTrgVertex() {
-		return new Vertex<K, VV>(this.f1, this.f3);
+		return new Vertex<>(this.f1, this.f3);
 	}
 
 	public Edge<K, EV> getEdge() {
-		return new Edge<K, EV>(this.f0, this.f1, this.f4);
+		return new Edge<>(this.f0, this.f1, this.f4);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
index a30d1a2..f40dac9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.DataSet;
+
+import java.io.Serializable;
 
 /**
  * Interface to be implemented by the transformation function

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index e9add7c..f05c254 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -105,7 +105,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
 	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
 	 */
 	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
+		return this.runtimeContext.getIterationAggregator(name);
 	}
 
 	/**
@@ -115,7 +115,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
 
 	/**
@@ -126,7 +126,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
 	 * @return The broadcast data set.
 	 */
 	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
+		return this.runtimeContext.getBroadcastVariable(name);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index 8d24f16..079b4c7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -39,13 +39,13 @@ import java.util.List;
 public class GSAConfiguration extends IterationConfiguration {
 
 	/** the broadcast variables for the gather function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<Tuple2<String,DataSet<?>>>();
+	private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>();
 
 	/** the broadcast variables for the sum function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<Tuple2<String,DataSet<?>>>();
+	private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<>();
 
 	/** the broadcast variables for the apply function **/
-	private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>();
+	private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<>();
 
 	private EdgeDirection direction = EdgeDirection.OUT;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index d914f2a..90db9da 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -94,7 +94,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
 	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
 	 */
 	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
+		return this.runtimeContext.getIterationAggregator(name);
 	}
 
 	/**
@@ -104,7 +104,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
 
 	/**
@@ -115,7 +115,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
 	 * @return The broadcast data set.
 	 */
 	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
+		return this.runtimeContext.getBroadcastVariable(name);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index d1b12f9..442ce68 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.types.LongValue;
@@ -119,13 +118,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		// Prepare type information
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
 		TypeInformation<M> messageType = TypeExtractor.createTypeInfo(gather, GatherFunction.class, gather.getClass(), 2);
-		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<>(keyType, messageType);
 		TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
 
-		// create a graph
-		Graph<K, VV, EV> graph =
-				Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment());
-
 		// check whether the numVertices option is set and, if so, compute the total number of vertices
 		// and set it within the gather, sum and apply functions
 
@@ -139,9 +134,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		}
 
 		// Prepare UDFs
-		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
-		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
-		ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
+		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<>(gather, innerType);
+		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<>(sum, innerType);
+		ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<>(apply, outputType);
 
 		final int[] zeroKeyPos = new int[] {0};
 		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
@@ -268,11 +263,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 	 *
 	 * @return An in stance of the gather-sum-apply graph computation operator.
 	 */
-	public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
+	public static <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
 		withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
 		SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
 
-		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
+		return new GatherSumApplyIteration<>(gather, sum, apply, edges, maximumNumberOfIterations);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -295,7 +290,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		@Override
 		public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
 			M result = this.gatherFunction.gather(neighborTuple.f1);
-			return new Tuple2<K, M>(neighborTuple.f0, result);
+			return new Tuple2<>(neighborTuple.f0, result);
 		}
 
 		@Override
@@ -337,7 +332,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 		public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
 			K key = arg0.f0;
 			M result = this.sumFunction.sum(arg0.f1, arg1.f1);
-			return new Tuple2<K, M>(key, result);
+			return new Tuple2<>(key, result);
 		}
 
 		@Override
@@ -411,8 +406,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
 
 		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
-			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
-					edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+			out.collect(new Tuple2<>(
+					edge.getTarget(), new Neighbor<>(vertex.getValue(), edge.getValue())));
 		}
 	}
 
@@ -422,8 +417,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
 
 		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
-			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
-					edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+			out.collect(new Tuple2<>(
+					edge.getSource(), new Neighbor<>(vertex.getValue(), edge.getValue())));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 68e8d27..e70af1f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -94,7 +94,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
 	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
 	 */
 	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
+		return this.runtimeContext.getIterationAggregator(name);
 	}
 
 	/**
@@ -104,7 +104,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
 
 	/**
@@ -115,7 +115,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
 	 * @return The broadcast data set.
 	 */
 	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
+		return this.runtimeContext.getBroadcastVariable(name);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index f39d858..4656757 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -51,7 +51,7 @@ public class GSASingleSourceShortestPaths<K> implements
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
-		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+		return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
 				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
 						new UpdateDistance<K>(), maxIterations)
 						.getVertices();
@@ -85,7 +85,7 @@ public class GSASingleSourceShortestPaths<K> implements
 		public Double gather(Neighbor<Double, Double> neighbor) {
 			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
@@ -93,7 +93,7 @@ public class GSASingleSourceShortestPaths<K> implements
 		public Double sum(Double newValue, Double currentValue) {
 			return Math.min(newValue, currentValue);
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 2d13dfd..96e5afc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -112,7 +112,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV>
 	public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends GatherFunction<K, VV, VV> {
 
 		public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> inMessages) {
-			Map<VV, Long> labelsWithFrequencies = new HashMap<VV, Long>();
+			Map<VV, Long> labelsWithFrequencies = new HashMap<>();
 
 			long maxFrequency = 1;
 			VV mostFrequentLabel = vertex.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 4ff4e79..5aa1ac1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -51,7 +51,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
-		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+		return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
 				.runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
 				maxIterations).getVertices();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 681d060..dabeb06 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -110,7 +110,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 	private static final class DegreeCounter<K extends Comparable<K>, EV>
 			implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
 
-		final ArrayList<K> otherVertices = new ArrayList<K>();
+		final ArrayList<K> otherVertices = new ArrayList<>();
 		final EdgeWithDegrees<K> outputEdge = new EdgeWithDegrees<>();
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
index db64f63..08c15e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.DataSet;
@@ -33,6 +29,10 @@ import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
 /**
  * The base class for the message-passing functions between vertices as a part of a {@link VertexCentricIteration}.
  * 
@@ -86,7 +86,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	 */
 	public final Iterable<Edge<K, EV>> getEdges() {
 		verifyEdgeUsage();
-		this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+		this.edgeIterator.set(edges);
 		return this.edgeIterator;
 	}
 
@@ -100,7 +100,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 		verifyEdgeUsage();
 		outMsg.setField(m, 1);
 		while (edges.hasNext()) {
-			Tuple next = (Tuple) edges.next();
+			Tuple next = edges.next();
 			outMsg.setField(next.getField(1), 0);
 			out.collect(Either.Right(outMsg));
 		}
@@ -157,7 +157,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	 * @return The aggregator registered under this name, or {@code null}, if no aggregator was registered.
 	 */
 	public final <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
+		return this.runtimeContext.getIterationAggregator(name);
 	}
 	
 	/**
@@ -167,7 +167,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public final <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
 	
 	/**
@@ -179,7 +179,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	 * @return The broadcast data set.
 	 */
 	public final <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
+		return this.runtimeContext.getBroadcastVariable(name);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -204,9 +204,9 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
-		this.outVertex = new Vertex<K, VV>();
-		this.outMsg = new Tuple2<K, Message>();
-		this.edgeIterator = new EdgesIterator<K, EV>();
+		this.outVertex = new Vertex<>();
+		this.outMsg = new Tuple2<>();
+		this.edgeIterator = new EdgesIterator<>();
 	}
 	
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
index 70c8262..9398d8d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
- * The base class for combining messages sent during a {@link VertexCentricteration}.
+ * The base class for combining messages sent during a {@link VertexCentricIteration}.
  * 
  * @param <K> The type of the vertex id
  * @param <Message> The type of the message sent between vertices along the edges.
@@ -37,15 +37,12 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
 
 	private Collector<Tuple2<K, Either<NullValue, Message>>> out;
 
-	private K vertexId;
-
 	private Tuple2<K, Either<NullValue, Message>> outValue;
 
 	void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) {
-		this.vertexId = target;
 		this.out = collector;
-		this.outValue = new Tuple2<K, Either<NullValue, Message>>();
-		outValue.setField(vertexId, 0);
+		this.outValue = new Tuple2<>();
+		outValue.setField(target, 0);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index 026e869..a0f793a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -37,7 +37,7 @@ import java.util.List;
 public class VertexCentricConfiguration extends IterationConfiguration {
 
 	/** the broadcast variables for the compute function **/
-	private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<Tuple2<String,DataSet<?>>>();
+	private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<>();
 
 	public VertexCentricConfiguration() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index 8d96776..ebf2e8d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -18,24 +18,21 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.util.Iterator;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -50,6 +47,9 @@ import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Iterator;
+import java.util.Map;
+
 /**
  * This class represents iterative graph computations, programmed in a vertex-centric perspective.
  * It is a special case of <i>Bulk Synchronous Parallel</i> computation. The paradigm has also been
@@ -158,14 +158,14 @@ public class VertexCentricIteration<K, VV, EV, Message>
 		// prepare the type information
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
 		TypeInformation<Tuple2<K, Message>> messageTypeInfo =
-			new TupleTypeInfo<Tuple2<K, Message>>(keyType, messageType);
+			new TupleTypeInfo<>(keyType, messageType);
 		TypeInformation<Vertex<K, VV>> vertexType = initialVertices.getType();
 		TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> intermediateTypeInfo =
-			new EitherTypeInfo<Vertex<K, VV>, Tuple2<K, Message>>(vertexType, messageTypeInfo);
+			new EitherTypeInfo<>(vertexType, messageTypeInfo);
 		TypeInformation<Either<NullValue, Message>> nullableMsgTypeInfo =
-				new EitherTypeInfo<NullValue, Message>(TypeExtractor.getForClass(NullValue.class), messageType);
+			new EitherTypeInfo<>(TypeExtractor.getForClass(NullValue.class), messageType);
 		TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo =
-			new TupleTypeInfo<Tuple2<K, Either<NullValue, Message>>>(keyType, nullableMsgTypeInfo);
+			new TupleTypeInfo<>(keyType, nullableMsgTypeInfo);
 
 		DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = initialVertices.map(
 				new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo);
@@ -183,7 +183,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 						vertexType, nullableMsgTypeInfo));
 
 		VertexComputeUdf<K, VV, EV, Message> vertexUdf =
-				new VertexComputeUdf<K, VV, EV, Message>(computeFunction, intermediateTypeInfo); 
+			new VertexComputeUdf<>(computeFunction, intermediateTypeInfo);
 
 		CoGroupOperator<?, ?, Either<Vertex<K, VV>, Tuple2<K, Message>>> superstepComputation =
 				verticesWithMsgs.coGroup(edgesWithValue)
@@ -204,7 +204,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 		if (combineFunction != null) {
 
 			MessageCombinerUdf<K, Message> combinerUdf =
-					new MessageCombinerUdf<K, Message>(combineFunction, workSetTypeInfo);
+				new MessageCombinerUdf<>(combineFunction, workSetTypeInfo);
 
 			DataSet<Tuple2<K, Either<NullValue, Message>>> combinedMessages = allMessages
 					.groupBy(0).reduceGroup(combinerUdf)
@@ -237,12 +237,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
 	 * 
 	 * @return An instance of the vertex-centric graph computation operator.
 	 */
-	public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
+	public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
 		DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf,
 		int maximumNumberOfIterations) {
 
-		return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, null,
-				maximumNumberOfIterations);
+		return new VertexCentricIteration<>(cf, edgesWithValue, null,
+			maximumNumberOfIterations);
 	}
 
 	/**
@@ -260,12 +260,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
 	 * 
 	 * @return An instance of the vertex-centric graph computation operator.
 	 */
-	public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
+	public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges(
 		DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf,
 		MessageCombiner<K, Message> mc, int maximumNumberOfIterations) {
 
-		return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, mc,
-				maximumNumberOfIterations);
+		return new VertexCentricIteration<>(cf, edgesWithValue, mc,
+			maximumNumberOfIterations);
 	}
 
 	/**
@@ -297,7 +297,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
 		@Override
 		public void open(Configuration parameters) {
-			outTuple = new Tuple2<K, Either<NullValue, Message>>();
+			outTuple = new Tuple2<>();
 			nullMessage = Either.Left(NullValue.getInstance());
 			outTuple.setField(nullMessage, 1);
 		}
@@ -308,12 +308,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
 		}
 }
 	
-	@SuppressWarnings("serial")
 	/**
 	 * This coGroup class wraps the user-defined compute function.
 	 * The first input holds a Tuple2 containing the vertex state and its inbox.
 	 * The second input is an iterator of the out-going edges of this vertex.
 	 */
+	@SuppressWarnings("serial")
 	private static class VertexComputeUdf<K, VV, EV, Message> extends RichCoGroupFunction<
 		Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>,
 		Either<Vertex<K, VV>, Tuple2<K, Message>>>
@@ -469,8 +469,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 			JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>,
 					Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> {
 
-		private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple =
-				new Tuple2<Vertex<K, VV>, Either<NullValue, Message>>();
+		private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple = new Tuple2<>();
 
 		public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join(
 				Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message) {
@@ -498,7 +497,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 	private static final class ProjectMessages<K, VV, Message> implements
 			FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> {
 
-		private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<K, Either<NullValue, Message>>();
+		private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<>();
 
 		public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> value,
 				Collector<Tuple2<K, Either<NullValue, Message>>> out) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
index d56c0da..93b3a8c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -141,7 +141,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
 	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
 	 */
 	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
+		return this.runtimeContext.getIterationAggregator(name);
 	}
 
 	/**
@@ -151,7 +151,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
 
 	/**
@@ -163,7 +163,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
 	 * @return The broadcast data set.
 	 */
 	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
+		return this.runtimeContext.getBroadcastVariable(name);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -243,8 +243,8 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable {
 	<VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
 												MessageIterator<Message> inMessages) throws Exception {
 
-		Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
-				((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
+		Vertex<K, VV> vertex = new Vertex<>(vertexState.f0,
+			((Tuple3<VV, Long, Long>) vertexState.getValue()).f0);
 
 		updateVertex(vertex, inMessages);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
index 336e73d..b99b5b7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -217,7 +217,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
 	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
 	 */
 	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
+		return this.runtimeContext.getIterationAggregator(name);
 	}
 
 	/**
@@ -227,7 +227,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
 
 	/**
@@ -239,7 +239,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
 	 * @return The broadcast data set.
 	 */
 	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
+		return this.runtimeContext.getBroadcastVariable(name);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -266,8 +266,8 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
 
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
-		this.outValue = new Tuple2<K, Message>();
-		this.edgeIterator = new EdgesIterator<K, EV>();
+		this.outValue = new Tuple2<>();
+		this.edgeIterator = new EdgesIterator<>();
 	}
 
 	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
@@ -282,7 +282,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl
 	{
 		private Iterator<Edge<K, EV>> input;
 
-		private Edge<K, EV> edge = new Edge<K, EV>();
+		private Edge<K, EV> edge = new Edge<>();
 
 		void set(Iterator<Edge<K, EV>> input) {
 			this.input = input;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index fde305f..8049932 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -196,7 +196,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * 
 	 * @return An in stance of the scatter-gather graph computation operator.
 	 */
-	public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
+	public static <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges(
 		DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf,
 		GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations)
 	{
@@ -588,8 +588,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 
-		GatherUdf<K, VV, Message> updateUdf =
-				new GatherUdfSimpleVV<K, VV, Message>(gatherFunction, vertexTypes);
+		GatherUdf<K, VV, Message> updateUdf = new GatherUdfSimpleVV<>(gatherFunction, vertexTypes);
 
 		// build the update function (co group)
 		CoGroupOperator<?, ?, Vertex<K, VV>> updates =

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 33d469b..b620dd8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -50,8 +50,8 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV,
 
 	private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-			out.collect(new Tuple1<K>(edge.f0));
-			out.collect(new Tuple1<K>(edge.f1));
+			out.collect(new Tuple1<>(edge.f0));
+			out.collect(new Tuple1<>(edge.f1));
 		}
 	}
 
@@ -67,7 +67,7 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV,
 
 	private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
 		public Tuple1<K> map(K key) throws Exception {
-			return new Tuple1<K>(key);
+			return new Tuple1<>(key);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
index 0cf026c..ba6bd05 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -33,7 +33,6 @@ import org.junit.Test;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
 import static org.junit.Assert.assertEquals;
 
 public class TranslateTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 1e44d5b..98ecf16 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
@@ -240,10 +239,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("gatherBcastSet");
-			Assert.assertEquals(1, bcastSet.get(0));
-			Assert.assertEquals(2, bcastSet.get(1));
-			Assert.assertEquals(3, bcastSet.get(2));
+			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("gatherBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0).intValue());
+			Assert.assertEquals(2, bcastSet.get(1).intValue());
+			Assert.assertEquals(3, bcastSet.get(2).intValue());
 
 			// test aggregator
 			if (getSuperstepNumber() == 2) {
@@ -271,10 +270,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("sumBcastSet");
-			Assert.assertEquals(4, bcastSet.get(0));
-			Assert.assertEquals(5, bcastSet.get(1));
-			Assert.assertEquals(6, bcastSet.get(2));
+			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("sumBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0).intValue());
+			Assert.assertEquals(5, bcastSet.get(1).intValue());
+			Assert.assertEquals(6, bcastSet.get(2).intValue());
 
 			// test aggregator
 			aggregator = getIterationAggregator("superstepAggregator");
@@ -286,7 +285,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 		public Long sum(Long newValue, Long currentValue) {
 			long superstep = getSuperstepNumber();
 			aggregator.aggregate(superstep);
-			return 0l;
+			return 0L;
 		}
 	}
 
@@ -300,10 +299,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("applyBcastSet");
-			Assert.assertEquals(7, bcastSet.get(0));
-			Assert.assertEquals(8, bcastSet.get(1));
-			Assert.assertEquals(9, bcastSet.get(2));
+			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("applyBcastSet");
+			Assert.assertEquals(7, bcastSet.get(0).intValue());
+			Assert.assertEquals(8, bcastSet.get(1).intValue());
+			Assert.assertEquals(9, bcastSet.get(2).intValue());
 
 			// test aggregator
 			aggregator = getIterationAggregator("superstepAggregator");
@@ -338,7 +337,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 	private static final class DummySum extends SumFunction<Long, Long, Long> {
 
 		public Long sum(Long newValue, Long currentValue) {
-			return 0l;
+			return 0L;
 		}
 	}
 
@@ -354,7 +353,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
 
 		public Long map(Vertex<Long, Long> value) {
-			return 1l;
+			return 1L;
 		}
 	}
 
@@ -363,7 +362,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 		@Override
 		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
-			HashSet<Long> h = new HashSet<Long>();
+			HashSet<Long> h = new HashSet<>();
 			h.add(value.getId());
 			return h;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index fcd0d82..2213700 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
@@ -521,10 +520,10 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
-			Assert.assertEquals(4, bcastSet.get(0));
-			Assert.assertEquals(5, bcastSet.get(1));
-			Assert.assertEquals(6, bcastSet.get(2));
+			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("messagingBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0).intValue());
+			Assert.assertEquals(5, bcastSet.get(1).intValue());
+			Assert.assertEquals(6, bcastSet.get(2).intValue());
 
 			// test number of vertices
 			Assert.assertEquals(5, getNumberOfVertices());
@@ -569,10 +568,10 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
-			Assert.assertEquals(1, bcastSet.get(0));
-			Assert.assertEquals(2, bcastSet.get(1));
-			Assert.assertEquals(3, bcastSet.get(2));
+			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("updateBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0).intValue());
+			Assert.assertEquals(2, bcastSet.get(1).intValue());
+			Assert.assertEquals(3, bcastSet.get(2).intValue());
 
 			// test aggregator
 			aggregator = getIterationAggregator("superstepAggregator");

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
index 99c66ec..3ccdef0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.test.operations;
 
-import com.google.common.base.Charsets;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -30,10 +29,12 @@ import org.apache.flink.types.NullValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -193,7 +194,7 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 		tempFile.deleteOnExit();
 
 		OutputStreamWriter wrt = new OutputStreamWriter(
-				new FileOutputStream(tempFile), Charsets.UTF_8
+				new FileOutputStream(tempFile), Charset.forName("UTF-8")
 		);
 		wrt.write(content);
 		wrt.close();