You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/01/17 21:35:46 UTC

[3/6] flink git commit: [hotfix] [gelly] Improve generic type formatting

[hotfix] [gelly] Improve generic type formatting

Add spacing between type parameters.

For example, 'Vertex<K,VV>' has been updated to 'Vertex<K, VV>'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53716a4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53716a4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53716a4d

Branch: refs/heads/master
Commit: 53716a4da1732ee66c7906772bd5bfcd2218b3e1
Parents: b408d61
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Jan 17 14:25:37 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jan 17 15:38:24 2017 -0500

----------------------------------------------------------------------
 .../apache/flink/graph/drivers/Graph500.java    |  2 +-
 .../graph/examples/data/TriangleCountData.java  | 10 ++--
 .../graph/library/TriangleEnumeratorITCase.java |  6 +--
 .../test/examples/IncrementalSSSPITCase.java    |  2 +-
 .../main/java/org/apache/flink/graph/Graph.java | 30 +++++------
 .../asm/translate/TranslateEdgeValues.java      |  2 +-
 .../graph/asm/translate/TranslateGraphIds.java  |  2 +-
 .../graph/generator/AbstractGraphGenerator.java |  2 +-
 .../flink/graph/generator/CompleteGraph.java    | 12 ++---
 .../flink/graph/generator/CycleGraph.java       |  2 +-
 .../flink/graph/generator/EmptyGraph.java       | 10 ++--
 .../flink/graph/generator/GraphGenerator.java   |  4 +-
 .../graph/generator/GraphGeneratorUtils.java    | 18 +++----
 .../apache/flink/graph/generator/GridGraph.java | 20 +++----
 .../apache/flink/graph/generator/PathGraph.java |  2 +-
 .../apache/flink/graph/generator/RMatGraph.java | 14 ++---
 .../graph/generator/SingletonEdgeGraph.java     | 12 ++---
 .../apache/flink/graph/generator/StarGraph.java | 14 ++---
 .../flink/graph/library/TriangleEnumerator.java | 10 ++--
 .../directed/LocalClusteringCoefficient.java    |  2 +-
 .../undirected/LocalClusteringCoefficient.java  |  4 +-
 .../graph/pregel/VertexCentricIteration.java    |  2 +-
 .../graph/spargel/ScatterGatherIteration.java   |  4 +-
 .../proxy/GraphAlgorithmWrappingDataSet.java    |  2 +-
 .../org/apache/flink/graph/asm/AsmTestBase.java | 16 +++---
 .../graph/asm/simple/directed/SimplifyTest.java |  4 +-
 .../asm/simple/undirected/SimplifyTest.java     |  6 +--
 .../graph/asm/translate/TranslateTest.java      |  2 +-
 .../graph/generator/CompleteGraphTest.java      | 10 ++--
 .../flink/graph/generator/CycleGraphTest.java   | 10 ++--
 .../flink/graph/generator/EmptyGraphTest.java   | 10 ++--
 .../flink/graph/generator/GridGraphTest.java    |  6 +--
 .../graph/generator/HypercubeGraphTest.java     |  6 +--
 .../flink/graph/generator/PathGraphTest.java    | 10 ++--
 .../flink/graph/generator/RMatGraphTest.java    |  8 +--
 .../graph/generator/SingletonEdgeGraphTest.java | 10 ++--
 .../flink/graph/generator/StarGraphTest.java    | 10 ++--
 .../apache/flink/graph/generator/TestUtils.java |  6 +--
 .../graph/library/link_analysis/HITSTest.java   |  4 +-
 .../flink/graph/pregel/PregelCompilerTest.java  | 24 ++++-----
 .../graph/pregel/PregelTranslationTest.java     | 16 +++---
 .../graph/spargel/SpargelCompilerTest.java      | 24 ++++-----
 .../graph/spargel/SpargelTranslationTest.java   |  4 +-
 .../test/ScatterGatherConfigurationITCase.java  |  6 +--
 .../apache/flink/graph/test/TestGraphUtils.java | 20 +++----
 .../test/operations/FromCollectionITCase.java   | 13 ++---
 .../test/operations/GraphCreationITCase.java    | 18 +++----
 .../operations/GraphCreationWithCsvITCase.java  | 16 +++---
 .../GraphCreationWithMapperITCase.java          | 16 +++---
 .../test/operations/GraphMutationsITCase.java   | 56 ++++++++++----------
 .../test/operations/GraphOperationsITCase.java  | 20 +++----
 .../test/operations/JoinWithEdgesITCase.java    | 34 ++++++------
 .../test/operations/JoinWithVerticesITCase.java | 24 ++++-----
 .../operations/ReduceOnEdgesMethodsITCase.java  | 36 ++++++-------
 .../ReduceOnNeighborMethodsITCase.java          | 30 +++++------
 55 files changed, 332 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 51ef66f..3509d2e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -103,7 +103,7 @@ public class Graph500 {
 			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 		}
 
-		DataSet<Tuple2<LongValue,LongValue>> edges = graph
+		DataSet<Tuple2<LongValue, LongValue>> edges = graph
 			.getEdges()
 			.project(0, 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
index a14010f..cf3a715 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
@@ -53,11 +53,11 @@ public class TriangleCountData {
 
 	public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
 
-	public static List<Tuple3<Long,Long,Long>> getListOfTriangles()	{
-		ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
-		ret.add(new Tuple3<>(1L,2L,3L));
-		ret.add(new Tuple3<>(2L,3L,6L));
-		ret.add(new Tuple3<>(4L,3L,5L));
+	public static List<Tuple3<Long, Long, Long>> getListOfTriangles()	{
+		ArrayList<Tuple3<Long, Long, Long>> ret = new ArrayList<>(3);
+		ret.add(new Tuple3<>(1L, 2L, 3L));
+		ret.add(new Tuple3<>(2L, 3L, 6L));
+		ret.add(new Tuple3<>(4L, 3L, 5L));
 		return ret;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
index 9550405..176a7e1 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
@@ -46,11 +46,11 @@ public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
 		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
 				env);
 
-		List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
-		List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();
+		List<Tuple3<Long, Long, Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
+		List<Tuple3<Long, Long, Long>> expectedResult = TriangleCountData.getListOfTriangles();
 
 		Assert.assertEquals(expectedResult.size(), actualOutput.size());
-		for(Tuple3<Long,Long,Long> resultTriangle:actualOutput)	{
+		for(Tuple3<Long, Long, Long> resultTriangle:actualOutput)	{
 			Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
index 24b8cf1..de92666 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
@@ -82,7 +82,7 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
 	public void testIncrementalSSSP() throws Exception {
 		IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
 				IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
-				IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
+				IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
 		expected = IncrementalSSSPData.RESULTED_VERTICES;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 093b804..dae7a11 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -544,7 +544,7 @@ public class Graph<K, VV, EV> {
 	 * @param returnType the explicit return type.
 	 * @return a new graph
 	 */
-	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
+	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
 		DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
 				new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
 					private Vertex<K, NV> output = new Vertex<>();
@@ -588,7 +588,7 @@ public class Graph<K, VV, EV> {
 	 * @param returnType the explicit return type.
 	 * @return a new graph
 	 */
-	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
+	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K, NV>> returnType) {
 		DataSet<Edge<K, NV>> mappedEdges = edges.map(
 			new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
 				private Edge<K, NV> output = new Edge<>();
@@ -924,7 +924,7 @@ public class Graph<K, VV, EV> {
 		private Tuple2<K, LongValue> vertexDegree = new Tuple2<>(null, degree);
 
 		@SuppressWarnings("unused")
-		public void coGroup(Iterable<Vertex<K, VV>> vertex,	Iterable<Edge<K, EV>> outEdges,
+		public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
 				Collector<Tuple2<K, LongValue>> out) {
 			long count = 0;
 			for (Edge<K, EV> edge : outEdges) {
@@ -1217,7 +1217,7 @@ public class Graph<K, VV, EV> {
 			this.function = fun;
 		}
 
-		public void coGroup(Iterable<Vertex<K, VV>> vertex,	final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
+		public void coGroup(Iterable<Vertex<K, VV>> vertex, 	final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
 				Collector<T> out) throws Exception {
 
 			final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K, EV>>() {
@@ -1416,9 +1416,9 @@ public class Graph<K, VV, EV> {
 	 */
 	public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
 
-		DataSet<Edge<K,EV>> newEdgesDataSet = this.context.fromCollection(newEdges);
+		DataSet<Edge<K, EV>> newEdgesDataSet = this.context.fromCollection(newEdges);
 
-		DataSet<Edge<K,EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
+		DataSet<Edge<K, EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
 				.where(0).equalTo(0)
 				.with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>()).name("Join with source")
 				.join(this.getVertices()).where(1).equalTo(0)
@@ -1516,7 +1516,7 @@ public class Graph<K, VV, EV> {
 	}
 
 	@ForwardedFieldsSecond("f0; f1; f2")
-	private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
+	private static final class ProjectEdgeToBeRemoved<K, VV, EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
 		@Override
 		public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
 			return edge;
@@ -1559,12 +1559,12 @@ public class Graph<K, VV, EV> {
 	public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
 
 		DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
-				.where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>()).name("Remove edges");
+				.where(0, 1).equalTo(0, 1).with(new EdgeRemovalCoGroup<K, EV>()).name("Remove edges");
 
 		return new Graph<>(this.vertices, newEdges, context);
 	}
 
-	private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
+	private static final class EdgeRemovalCoGroup<K, EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
 
 		@Override
 		public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved,
@@ -1608,8 +1608,8 @@ public class Graph<K, VV, EV> {
 	 * @param graph the graph to perform difference with
 	 * @return a new graph where the common vertices and edges have been removed
 	 */
-	public Graph<K,VV,EV> difference(Graph<K,VV,EV> graph) {
-		DataSet<Vertex<K,VV>> removeVerticesData = graph.getVertices();
+	public Graph<K, VV, EV> difference(Graph<K, VV, EV> graph) {
+		DataSet<Vertex<K, VV>> removeVerticesData = graph.getVertices();
 		return this.removeVertices(removeVerticesData);
 	}
 
@@ -1688,7 +1688,7 @@ public class Graph<K, VV, EV> {
 	 * @param <EV> 	edge value type
 	 */
 	private static final class MatchingEdgeReducer<K, EV>
-			implements CoGroupFunction<Edge<K,EV>, Edge<K,EV>, Edge<K, EV>> {
+			implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
 
 		@Override
 		public void coGroup(Iterable<Edge<K, EV>> edgesLeft, Iterable<Edge<K, EV>> edgesRight, Collector<Edge<K, EV>> out)
@@ -2149,12 +2149,12 @@ public class Graph<K, VV, EV> {
 
 		public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
 				Collector<T> out) throws Exception {
-			function.iterateNeighbors(vertex.iterator().next(),	neighbors, out);
+			function.iterateNeighbors(vertex.iterator().next(), 	neighbors, out);
 		}
 
 		@Override
 		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
+			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 	function.getClass(), 3, null, null);
 		}
 	}
 
@@ -2209,7 +2209,7 @@ public class Graph<K, VV, EV> {
 
 		@Override
 		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
+			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index b2b7594..c8000e4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -39,7 +39,7 @@ public class TranslateEdgeValues<K, VV, OLD, NEW>
 extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 
 	// Required configuration
-	private TranslateFunction<OLD,NEW> translator;
+	private TranslateFunction<OLD, NEW> translator;
 
 	// Optional configuration
 	private int parallelism = PARALLELISM_DEFAULT;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index e079a41..58cb7e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -41,7 +41,7 @@ public class TranslateGraphIds<OLD, NEW, VV, EV>
 extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 
 	// Required configuration
-	private TranslateFunction<OLD,NEW> translator;
+	private TranslateFunction<OLD, NEW> translator;
 
 	// Optional configuration
 	private int parallelism = PARALLELISM_DEFAULT;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
index 7f37356..58266a5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
@@ -27,7 +27,7 @@ implements GraphGenerator<K, VV, EV> {
 	protected int parallelism = PARALLELISM_DEFAULT;
 
 	@Override
-	public GraphGenerator<K,VV,EV> setParallelism(int parallelism) {
+	public GraphGenerator<K, VV, EV> setParallelism(int parallelism) {
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index 5339b14..a4996ab 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -58,14 +58,14 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		// Vertices
-		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
 		// Edges
 		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
 
-		DataSet<Edge<LongValue,NullValue>> edges = env
+		DataSet<Edge<LongValue, NullValue>> edges = env
 			.fromParallelCollection(iterator, LongValue.class)
 				.setParallelism(parallelism)
 				.name("Edge iterators")
@@ -79,20 +79,20 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@ForwardedFields("*->f0")
 	public class LinkVertexToAll
-	implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
 		private final long vertexCount;
 
 		private LongValue target = new LongValue();
 
-		private Edge<LongValue,NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
 
 		public LinkVertexToAll(long vertex_count) {
 			this.vertexCount = vertex_count;
 		}
 
 		@Override
-		public void flatMap(LongValue source, Collector<Edge<LongValue,NullValue>> out)
+		public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>> out)
 				throws Exception {
 			edge.f0 = source;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index 2671efe..ce8b467 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -51,7 +51,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		return new GridGraph(env)
 			.addDimension(vertexCount, true)
 			.setParallelism(parallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 05bfd89..7ec368b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -60,16 +60,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		// Vertices
-		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
 		// Edges
-		TypeInformation<Edge<LongValue,NullValue>> typeInformation = new TupleTypeInfo<>(
+		TypeInformation<Edge<LongValue, NullValue>> typeInformation = new TupleTypeInfo<>(
 			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
 
-		DataSource<Edge<LongValue,NullValue>> edges = env
-			.fromCollection(Collections.<Edge<LongValue,NullValue>>emptyList(), typeInformation)
+		DataSource<Edge<LongValue, NullValue>> edges = env
+			.fromCollection(Collections.<Edge<LongValue ,NullValue>>emptyList(), typeInformation)
 				.setParallelism(parallelism)
 				.name("Empty edge set");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
index 8fece81..f972d98 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
@@ -39,7 +39,7 @@ public interface GraphGenerator<K, VV, EV> {
 	 *
 	 * @return generated graph
 	 */
-	Graph<K,VV,EV> generate();
+	Graph<K, VV, EV> generate();
 
 	/**
 	 * Override the operator parallelism.
@@ -47,5 +47,5 @@ public interface GraphGenerator<K, VV, EV> {
 	 * @param parallelism operator parallelism
 	 * @return this
 	 */
-	GraphGenerator<K,VV,EV> setParallelism(int parallelism);
+	GraphGenerator<K, VV, EV> setParallelism(int parallelism);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index 01cb2d1..485394c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -42,7 +42,7 @@ public class GraphGeneratorUtils {
 	 * @param vertexCount number of sequential vertex labels
 	 * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices}
 	 */
-	public static DataSet<Vertex<LongValue,NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
+	public static DataSet<Vertex<LongValue, NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
 		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount-1);
 
 		DataSource<LongValue> vertexLabels = env
@@ -58,9 +58,9 @@ public class GraphGeneratorUtils {
 
 	@ForwardedFields("*->f0")
 	private static class CreateVertex
-	implements MapFunction<LongValue, Vertex<LongValue,NullValue>> {
+	implements MapFunction<LongValue, Vertex<LongValue, NullValue>> {
 
-		private Vertex<LongValue,NullValue> vertex = new Vertex<>(null, NullValue.getInstance());
+		private Vertex<LongValue, NullValue> vertex = new Vertex<>(null, NullValue.getInstance());
 
 		@Override
 		public Vertex<LongValue, NullValue> map(LongValue value)
@@ -84,8 +84,8 @@ public class GraphGeneratorUtils {
 	 *
 	 * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
 	 */
-	public static <K,EV> DataSet<Vertex<K,NullValue>> vertexSet(DataSet<Edge<K,EV>> edges, int parallelism) {
-		DataSet<Vertex<K,NullValue>> vertexSet = edges
+	public static <K, EV> DataSet<Vertex<K, NullValue>> vertexSet(DataSet<Edge<K, EV>> edges, int parallelism) {
+		DataSet<Vertex<K, NullValue>> vertexSet = edges
 			.flatMap(new EmitSrcAndTarget<K, EV>())
 				.setParallelism(parallelism)
 				.name("Emit source and target labels");
@@ -99,13 +99,13 @@ public class GraphGeneratorUtils {
 	/**
 	 * @see Graph.EmitSrcAndTarget
 	 */
-	private static final class EmitSrcAndTarget<K,EV>
-	implements FlatMapFunction<Edge<K,EV>, Vertex<K,NullValue>> {
+	private static final class EmitSrcAndTarget<K, EV>
+	implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
 
-		private Vertex<K,NullValue> output = new Vertex<>(null, new NullValue());
+		private Vertex<K, NullValue> output = new Vertex<>(null, new NullValue());
 
 		@Override
-		public void flatMap(Edge<K,EV> value, Collector<Vertex<K,NullValue>> out) throws Exception {
+		public void flatMap(Edge<K, EV> value, Collector<Vertex<K, NullValue>> out) throws Exception {
 			output.f0 = value.f0;
 			out.collect(output);
 			output.f0 = value.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 399a2f9..74ea764 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -44,7 +44,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private List<Tuple2<Long,Boolean>> dimensions = new ArrayList<>();
+	private List<Tuple2<Long, Boolean>> dimensions = new ArrayList<>();
 
 	private long vertexCount = 1;
 
@@ -84,18 +84,18 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		if (dimensions.isEmpty()) {
 			throw new RuntimeException("No dimensions added to GridGraph");
 		}
 
 		// Vertices
-		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
 		// Edges
 		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount - 1);
 
-		DataSet<Edge<LongValue,NullValue>> edges = env
+		DataSet<Edge<LongValue, NullValue>> edges = env
 			.fromParallelCollection(iterator, LongValue.class)
 				.setParallelism(parallelism)
 				.name("Edge iterators")
@@ -109,23 +109,23 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@ForwardedFields("*->f0")
 	public class LinkVertexToNeighbors
-	implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
 		private long vertexCount;
 
-		private List<Tuple2<Long,Boolean>> dimensions;
+		private List<Tuple2<Long, Boolean>> dimensions;
 
 		private LongValue target = new LongValue();
 
-		private Edge<LongValue,NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
 
-		public LinkVertexToNeighbors(long vertexCount, List<Tuple2<Long,Boolean>> dimensions) {
+		public LinkVertexToNeighbors(long vertexCount, List<Tuple2<Long, Boolean>> dimensions) {
 			this.vertexCount = vertexCount;
 			this.dimensions = dimensions;
 		}
 
 		@Override
-		public void flatMap(LongValue source, Collector<Edge<LongValue,NullValue>> out)
+		public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>> out)
 				throws Exception {
 			edge.f0 = source;
 			long val = source.getValue();
@@ -136,7 +136,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 			// the value in the remaining dimensions
 			long remainder = val;
 
-			for (Tuple2<Long,Boolean> dimension : dimensions) {
+			for (Tuple2<Long, Boolean> dimension : dimensions) {
 				increment /= dimension.f0;
 
 				// the index within this dimension

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index 1aec723..db5e6bf 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -51,7 +51,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		return new GridGraph(env)
 			.addDimension(vertexCount, false)
 			.setParallelism(parallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 8a17b13..2a80a37 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -139,7 +139,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1);
 
 		// Edges
@@ -148,7 +148,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
 			.getRandomGenerables(edgeCount, cyclesPerEdge);
 
-		DataSet<Edge<LongValue,NullValue>> edges = env
+		DataSet<Edge<LongValue, NullValue>> edges = env
 			.fromCollection(generatorBlocks)
 				.name("Random generators")
 			.rebalance()
@@ -159,14 +159,14 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 				.name("RMat graph edges");
 
 		// Vertices
-		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
 
 		// Graph
 		return Graph.fromDataSet(vertices, edges, env);
 	}
 
 	private static final class GenerateEdges<T extends RandomGenerator>
-	implements FlatMapFunction<BlockInfo<T>, Edge<LongValue,NullValue>> {
+	implements FlatMapFunction<BlockInfo<T>, Edge<LongValue, NullValue>> {
 
 		// Configuration
 		private final long vertexCount;
@@ -190,9 +190,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 		private LongValue target = new LongValue();
 
-		private Edge<LongValue,NullValue> sourceToTarget = new Edge<>(source, target, NullValue.getInstance());
+		private Edge<LongValue, NullValue> sourceToTarget = new Edge<>(source, target, NullValue.getInstance());
 
-		private Edge<LongValue,NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
+		private Edge<LongValue, NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
 
 		public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise) {
 			this.vertexCount = vertexCount;
@@ -206,7 +206,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		}
 
 		@Override
-		public void flatMap(BlockInfo<T> blockInfo, Collector<Edge<LongValue,NullValue>> out)
+		public void flatMap(BlockInfo<T> blockInfo, Collector<Edge<LongValue, NullValue>> out)
 				throws Exception {
 			RandomGenerator rng = blockInfo.getRandomGenerable().generator();
 			long edgesToGenerate = blockInfo.getElementCount();

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index e35b517..2eef7ae 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -58,16 +58,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		// Vertices
 		long vertexCount = 2 * this.vertexPairCount;
 
-		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
 		// Edges
 		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
 
-		DataSet<Edge<LongValue,NullValue>> edges = env
+		DataSet<Edge<LongValue, NullValue>> edges = env
 			.fromParallelCollection(iterator, LongValue.class)
 				.setParallelism(parallelism)
 				.name("Edge iterators")
@@ -81,16 +81,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@ForwardedFields("*->f0")
 	private static class LinkVertexToSingletonNeighbor
-	implements MapFunction<LongValue, Edge<LongValue,NullValue>> {
+	implements MapFunction<LongValue, Edge<LongValue, NullValue>> {
 
 		private LongValue source = new LongValue();
 
 		private LongValue target = new LongValue();
 
-		private Edge<LongValue,NullValue> edge = new Edge<>(source, target, NullValue.getInstance());
+		private Edge<LongValue, NullValue> edge = new Edge<>(source, target, NullValue.getInstance());
 
 		@Override
-		public Edge<LongValue,NullValue> map(LongValue value) throws Exception {
+		public Edge<LongValue, NullValue> map(LongValue value) throws Exception {
 			long val = value.getValue();
 
 			source.setValue(val);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index cb99f30..a47ae4d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -58,14 +58,14 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 	}
 
 	@Override
-	public Graph<LongValue,NullValue,NullValue> generate() {
+	public Graph<LongValue, NullValue, NullValue> generate() {
 		// Vertices
-		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
 		// Edges
 		LongValueSequenceIterator iterator = new LongValueSequenceIterator(1, this.vertexCount - 1);
 
-		DataSet<Edge<LongValue,NullValue>> edges = env
+		DataSet<Edge<LongValue, NullValue>> edges = env
 			.fromParallelCollection(iterator, LongValue.class)
 				.setParallelism(parallelism)
 				.name("Edge iterators")
@@ -79,16 +79,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@ForwardedFields("*->f0")
 	public class LinkVertexToCenter
-	implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
 		private LongValue center = new LongValue(0);
 
-		private Edge<LongValue,NullValue> center_to_leaf = new Edge<>(center, null, NullValue.getInstance());
+		private Edge<LongValue, NullValue> center_to_leaf = new Edge<>(center, null, NullValue.getInstance());
 
-		private Edge<LongValue,NullValue> leaf_to_center = new Edge<>(null, center, NullValue.getInstance());
+		private Edge<LongValue, NullValue> leaf_to_center = new Edge<>(null, center, NullValue.getInstance());
 
 		@Override
-		public void flatMap(LongValue leaf, Collector<Edge<LongValue,NullValue>> out)
+		public void flatMap(LongValue leaf, Collector<Edge<LongValue, NullValue>> out)
 				throws Exception {
 			center_to_leaf.f1 = leaf;
 			out.collect(center_to_leaf);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index dabeb06..6296618 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -60,10 +60,10 @@ import java.util.List;
  * grouping on edges on the vertex with the smaller degree.
  */
 public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
-	GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
+	GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 
 	@Override
-	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
+	public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws Exception {
 
 		DataSet<Edge<K, EV>> edges = input.getEdges();
 
@@ -77,7 +77,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 		// project edges by vertex id
 		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
 
-		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
+		DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
 				// build triads
 				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
 				.reduceGroup(new TriadBuilder<K>())
@@ -268,10 +268,10 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 	 * Filters triads (three vertices connected by two edges) without a closing third edge.
 	 */
 	@SuppressWarnings("serial")
-	private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
+	private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K, NullValue>, Tuple3<K, K, K>> {
 
 		@Override
-		public Tuple3<K,K,K> join(Triad<K> triad, Edge<K, NullValue> edge) throws Exception {
+		public Tuple3<K, K, K> join(Triad<K> triad, Edge<K, NullValue> edge) throws Exception {
 			return new Tuple3<>(triad.getFirstVertex(), triad.getSecondVertex(), triad.getThirdVertex());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 93fb678..cad36b3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -139,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			throws Exception {
 		// u, v, w, bitmask
 		DataSet<TriangleListing.Result<K>> triangles = input
-			.run(new TriangleListing<K,VV,EV>()
+			.run(new TriangleListing<K, VV, EV>()
 				.setLittleParallelism(littleParallelism));
 
 		// u, edge count

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index b22a0ce..99e3db9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -138,8 +138,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v, w
-		DataSet<Tuple3<K,K,K>> triangles = input
-			.run(new TriangleListing<K,VV,EV>()
+		DataSet<Tuple3<K, K, K>> triangles = input
+			.run(new TriangleListing<K, VV, EV>()
 				.setLittleParallelism(littleParallelism));
 
 		// u, 1

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index 5b2502e..7e8ebd7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -170,7 +170,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 		DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = initialVertices.map(
 				new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo);
 
-		final DeltaIteration<Vertex<K, VV>,	Tuple2<K, Either<NullValue, Message>>> iteration =
+		final DeltaIteration<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>> iteration =
 				initialVertices.iterateDelta(initialWorkSet, this.maximumNumberOfIterations, 0);
 		setUpIteration(iteration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index a378ab1..9f5585d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -569,7 +569,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
 
-		final DeltaIteration<Vertex<K, VV>,	Vertex<K, VV>> iteration =
+		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
 				initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
 				setUpIteration(iteration);
 
@@ -635,7 +635,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		DataSet<Vertex<K, Tuple3<VV, LongValue, LongValue>>> verticesWithDegrees = initialVertices
 				.join(degrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() {
+				.with(new FlatJoinFunction<Vertex<K, VV>, Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() {
 					@Override
 					public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
 									Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
index 7a4a0e6..11e7a64 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -52,7 +52,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
 	private static Map<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>> cache =
 		Collections.synchronizedMap(new HashMap<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>>());
 
-	private Graph<K,VV,EV> input;
+	private Graph<K, VV, EV> input;
 
 	private NoOpOperator<T> wrappingOperator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 8ef87a5..14c2da4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -38,24 +38,24 @@ public class AsmTestBase {
 	protected ExecutionEnvironment env;
 
 	// simple graph
-	protected Graph<IntValue,NullValue,NullValue> directedSimpleGraph;
+	protected Graph<IntValue, NullValue, NullValue> directedSimpleGraph;
 
-	protected Graph<IntValue,NullValue,NullValue> undirectedSimpleGraph;
+	protected Graph<IntValue, NullValue, NullValue> undirectedSimpleGraph;
 
 	// complete graph
 	protected final long completeGraphVertexCount = 47;
 
-	protected Graph<LongValue,NullValue,NullValue> completeGraph;
+	protected Graph<LongValue, NullValue, NullValue> completeGraph;
 
 	// empty graph
 	protected final long emptyGraphVertexCount = 3;
 
-	protected Graph<LongValue,NullValue,NullValue> emptyGraph;
+	protected Graph<LongValue, NullValue, NullValue> emptyGraph;
 
 	// RMat graph
-	protected Graph<LongValue,NullValue,NullValue> directedRMatGraph;
+	protected Graph<LongValue, NullValue, NullValue> directedRMatGraph;
 
-	protected Graph<LongValue,NullValue,NullValue> undirectedRMatGraph;
+	protected Graph<LongValue, NullValue, NullValue> undirectedRMatGraph;
 
 	@Before
 	public void setup()
@@ -73,7 +73,7 @@ public class AsmTestBase {
 			new Object[]{5, 3},
 		};
 
-		List<Edge<IntValue,NullValue>> directedEdgeList = new LinkedList<>();
+		List<Edge<IntValue, NullValue>> directedEdgeList = new LinkedList<>();
 
 		for (Object[] edge : edges) {
 			directedEdgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
@@ -95,7 +95,7 @@ public class AsmTestBase {
 		long rmatVertexCount = 1L << 10;
 		long rmatEdgeCount = 16 * rmatVertexCount;
 
-		Graph<LongValue,NullValue,NullValue> rmatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
+		Graph<LongValue, NullValue, NullValue> rmatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
 			.generate();
 
 		directedRMatGraph = rmatGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index a4b3946..709317c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -32,7 +32,7 @@ import java.util.List;
 
 public class SimplifyTest {
 
-	protected Graph<IntValue,NullValue,NullValue> graph;
+	protected Graph<IntValue, NullValue, NullValue> graph;
 
 	@Before
 	public void setup() {
@@ -65,7 +65,7 @@ public class SimplifyTest {
 			"(0,2,(null))\n" +
 			"(1,0,(null))";
 
-		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+		Graph<IntValue, NullValue, NullValue> simpleGraph = graph
 			.run(new Simplify<IntValue, NullValue, NullValue>());
 
 		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
index 2ace2bd..d589000 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -32,7 +32,7 @@ import java.util.List;
 
 public class SimplifyTest {
 
-	protected Graph<IntValue,NullValue,NullValue> graph;
+	protected Graph<IntValue, NullValue, NullValue> graph;
 
 	@Before
 	public void setup() {
@@ -66,7 +66,7 @@ public class SimplifyTest {
 			"(1,0,(null))\n" +
 			"(2,0,(null))";
 
-		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+		Graph<IntValue, NullValue, NullValue> simpleGraph = graph
 			.run(new Simplify<IntValue, NullValue, NullValue>(false));
 
 		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
@@ -79,7 +79,7 @@ public class SimplifyTest {
 			"(0,1,(null))\n" +
 			"(1,0,(null))";
 
-		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+		Graph<IntValue, NullValue, NullValue> simpleGraph = graph
 			.run(new Simplify<IntValue, NullValue, NullValue>(true));
 
 		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
index ba6bd05..7d6e3ea 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -86,7 +86,7 @@ public class TranslateTest {
 	@Test
 	public void testTranslateGraphIds()
 			throws Exception {
-		Graph<StringValue,LongValue, LongValue> stringIdGraph = graph
+		Graph<StringValue, LongValue, LongValue> stringIdGraph = graph
 			.translateGraphIds(new LongValueToStringValue());
 
 		for (Vertex<StringValue, LongValue> vertex : stringIdGraph.getVertices().collect()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
index 6c0e094..cb06da5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -36,7 +36,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 4;
 
-		Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
 			.generate();
 
 		String vertices = "0; 1; 2; 3";
@@ -50,7 +50,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 10;
 
-		Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
 			.generate();
 
 		assertEquals(vertexCount, graph.numberOfVertices());
@@ -72,12 +72,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, 10)
+		Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, 10)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
index ec36aa7..be56f56 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -34,7 +34,7 @@ extends AbstractGraphTest {
 	@Test
 	public void testGraph()
 			throws Exception {
-		Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, 10)
+		Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, 10)
 			.generate();
 
 		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -49,7 +49,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 100;
 
-		Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, vertexCount)
 			.generate();
 
 		assertEquals(vertexCount, graph.numberOfVertices());
@@ -71,12 +71,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, 100)
+		Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, 100)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
index d4a524f..c039607 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -34,7 +34,7 @@ extends AbstractGraphTest {
 	@Test
 	public void testGraph()
 			throws Exception {
-		Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, 10)
+		Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, 10)
 			.generate();
 
 		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -48,7 +48,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 100;
 
-		Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, vertexCount)
 			.generate();
 
 		assertEquals(vertexCount, graph.numberOfVertices());
@@ -66,12 +66,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, 100)
+		Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, 100)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
index 9606d1a..6a01a34 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -79,14 +79,14 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+		Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
 			.addDimension(3, false)
 			.addDimension(5, false)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
index d723ecb..49b0ba7 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -73,12 +73,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, 4)
+		Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, 4)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
index 3c3ce8c..b5c7cd3 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -34,7 +34,7 @@ extends AbstractGraphTest {
 	@Test
 	public void testGraph()
 			throws Exception {
-		Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 10)
+		Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, 10)
 			.generate();
 
 		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -49,7 +49,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 100;
 
-		Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount)
 			.generate();
 
 		assertEquals(vertexCount, graph.numberOfVertices());
@@ -71,12 +71,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 100)
+		Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, 100)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
index a06c63f..2cda69a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
@@ -44,7 +44,7 @@ extends AbstractGraphTest {
 
 		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
 
-		Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
 			.generate();
 
 		assertTrue(vertexCount >= graph.numberOfVertices());
@@ -58,12 +58,12 @@ extends AbstractGraphTest {
 
 		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
 
-		Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, 100, 1000)
+		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, 100, 1000)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
index 44a4d99..5e9a79a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -36,7 +36,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexPairCount = 5;
 
-		Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+		Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
 			.generate();
 
 		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -50,7 +50,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexPairCount = 10;
 
-		Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+		Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
 			.generate();
 
 		assertEquals(2 * vertexPairCount, graph.numberOfVertices());
@@ -72,12 +72,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, 10)
+		Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, 10)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
index c656cfb..4ccb804 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -36,7 +36,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 10;
 
-		Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, vertexCount)
 			.generate();
 
 		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -51,7 +51,7 @@ extends AbstractGraphTest {
 			throws Exception {
 		int vertexCount = 100;
 
-		Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+		Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, vertexCount)
 			.generate();
 
 		assertEquals(vertexCount, graph.numberOfVertices());
@@ -73,12 +73,12 @@ extends AbstractGraphTest {
 			throws Exception {
 		int parallelism = 2;
 
-		Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 100)
+		Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, 100)
 			.setParallelism(parallelism)
 			.generate();
 
-		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
 		TestUtils.verifyParallelism(env, parallelism);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index a302a30..2cf9eac 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -48,7 +48,7 @@ public final class TestUtils {
 	 * @param <EV> the value type for edges
 	 * @throws Exception
 	 */
-	public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String expectedVertices, String expectedEdges)
+	public static <K, VV, EV> void compareGraph(Graph<K, VV, EV> graph, String expectedVertices, String expectedEdges)
 			throws Exception {
 		compareVertices(graph, expectedVertices);
 		compareEdges(graph, expectedEdges);
@@ -63,7 +63,7 @@ public final class TestUtils {
 				resultVertices.add(vertex.f0.toString());
 			}
 
-			TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n"));
+			TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s", "").replace(";", "\n"));
 		}
 	}
 
@@ -76,7 +76,7 @@ public final class TestUtils {
 				resultEdges.add(edge.f0.toString() + "," + edge.f1.toString());
 			}
 
-			TestBaseUtils.compareResultAsText(resultEdges, expectedEdges.replaceAll("\\s","").replace(";", "\n"));
+			TestBaseUtils.compareResultAsText(resultEdges, expectedEdges.replaceAll("\\s", "").replace(";", "\n"));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 1551d84..679bf4f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -46,8 +46,8 @@ extends AsmTestBase {
 		List<Tuple2<Double, Double>> expectedResults = new ArrayList<>();
 		expectedResults.add(Tuple2.of(0.5446287864731747, 0.0));
 		expectedResults.add(Tuple2.of(0.0, 0.8363240238999012));
-		expectedResults.add(Tuple2.of(0.6072453524686667,0.26848532437604833));
-		expectedResults.add(Tuple2.of(0.5446287864731747,0.39546603929699625));
+		expectedResults.add(Tuple2.of(0.6072453524686667, 0.26848532437604833));
+		expectedResults.add(Tuple2.of(0.5446287864731747, 0.39546603929699625));
 		expectedResults.add(Tuple2.of(0.0, 0.26848532437604833));
 		expectedResults.add(Tuple2.of(0.194966796646811, 0.0));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
index 04f0ca4..fb21c14 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
@@ -18,31 +18,31 @@
 
 package org.apache.flink.graph.pregel;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class PregelCompilerTest extends CompilerTestBase {
 
@@ -62,7 +62,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
 						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
 							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
@@ -136,7 +136,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-						.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+						.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
 							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
 								return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
@@ -210,7 +210,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
 						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
 							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
index 8f9552f..3bf2e32 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
@@ -19,26 +19,26 @@
 
 package org.apache.flink.graph.pregel;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DeltaIterationResultSet;
 import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.TwoInputUdfOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class PregelTranslationTest {
@@ -71,7 +71,7 @@ public class PregelTranslationTest {
 				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
 				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
-						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+						edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
 							public Tuple3<String, String, NullValue> map(
 									Tuple2<String, String> edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 14c2fb4..676e0cd 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -18,33 +18,33 @@
 
 package org.apache.flink.graph.spargel;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 public class SpargelCompilerTest extends CompilerTestBase {
@@ -65,7 +65,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
 						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
 							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
@@ -147,7 +147,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 						.map(new Tuple2ToVertexMap<Long, Long>());
 
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-						.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+						.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
 							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
 								return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index 2c7e093..47b785d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -72,7 +72,7 @@ public class SpargelTranslationTest {
 				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
 				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
-						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+						edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
 							public Tuple3<String, String, NullValue> map(
 									Tuple2<String, String> edge) {
@@ -158,7 +158,7 @@ public class SpargelTranslationTest {
 				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
 				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
-						edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+						edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
 							public Tuple3<String, String, NullValue> map(
 									Tuple2<String, String> edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index 2213700..9100883 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -73,8 +73,8 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
 				new MessageFunction(), new UpdateFunction(), 10, parameters);
 
-		DataSet<Vertex<Long,Long>> data = res.getVertices();
-		List<Vertex<Long,Long>> result= data.collect();
+		DataSet<Vertex<Long, Long>> data = res.getVertices();
+		List<Vertex<Long, Long>> result= data.collect();
 
 		expectedResult = "1,11\n" +
 						"2,11\n" +
@@ -109,7 +109,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
 
 		DataSet<Vertex<Long, Long>> data = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
-        List<Vertex<Long,Long>> result= data.collect();
+        List<Vertex<Long, Long>> result= data.collect();
 
 		expectedResult = "1,11\n" +
 						"2,12\n" +