You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:13 UTC

[04/15] flink git commit: [FLINK-6709] [gelly] Activate strict checkstyle for flink-gellies

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
new file mode 100644
index 0000000..9f9bc06
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.library.linkanalysis.HITS.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link HITS}.
+ */
+public class HITSTest
+extends AsmTestBase {
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import math
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	hits=nx.algorithms.link_analysis.hits(graph)
+
+	hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values()))
+	authority_norm=math.sqrt(sum(v*v for v in hits[1].values()))
+
+	for key in sorted(hits[0]):
+		print('{}: {}, {}'.format(key, hits[0][key]/hubbiness_norm, hits[1][key]/authority_norm))
+	 */
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(20)
+			.run(directedSimpleGraph);
+
+		List<Tuple2<Double, Double>> expectedResults = new ArrayList<>();
+		expectedResults.add(Tuple2.of(0.544643396306, 0.0));
+		expectedResults.add(Tuple2.of(0.0, 0.836329395866));
+		expectedResults.add(Tuple2.of(0.607227031134, 0.268492526138));
+		expectedResults.add(Tuple2.of(0.544643396306, 0.395444899355));
+		expectedResults.add(Tuple2.of(0.0, 0.268492526138));
+		expectedResults.add(Tuple2.of(0.194942233447, 0.0));
+
+		for (Result<IntValue> result : hits.collect()) {
+			int id = result.f0.getValue();
+			assertEquals(expectedResults.get(id).f0, result.getHubScore().getValue(), 0.000001);
+			assertEquals(expectedResults.get(id).f1, result.getAuthorityScore().getValue(), 0.000001);
+		}
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		double expectedScore = 1.0 / Math.sqrt(completeGraphVertexCount);
+
+		DataSet<Result<LongValue>> hits = new HITS<LongValue, NullValue, NullValue>(0.000001)
+			.run(completeGraph);
+
+		List<Result<LongValue>> results = hits.collect();
+
+		assertEquals(completeGraphVertexCount, results.size());
+
+		for (Result<LongValue> result : results) {
+			assertEquals(expectedScore, result.getHubScore().getValue(), 0.000001);
+			assertEquals(expectedScore, result.getAuthorityScore().getValue(), 0.000001);
+		}
+	}
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import math
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	hits=nx.algorithms.link_analysis.hits(graph)
+
+	hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values()))
+	authority_norm=math.sqrt(sum(v*v for v in hits[1].values()))
+
+	for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]:
+		print('{}: {}, {}'.format(key, hits[0][str(key)]/hubbiness_norm, hits[1][str(key)]/authority_norm))
+	 */
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		DataSet<Result<LongValue>> hits = directedRMatGraph(10, 16)
+			.run(new HITS<LongValue, NullValue, NullValue>(0.000001));
+
+		Map<Long, Result<LongValue>> results = new HashMap<>();
+		for (Result<LongValue> result :  new Collect<Result<LongValue>>().run(hits).execute()) {
+			results.put(result.f0.getValue(), result);
+		}
+
+		assertEquals(902, results.size());
+
+		Map<Long, Tuple2<Double, Double>> expectedResults = new HashMap<>();
+		// a pseudo-random selection of results, both high and low
+		expectedResults.put(0L, Tuple2.of(0.231077034747, 0.238110214937));
+		expectedResults.put(1L, Tuple2.of(0.162364053933, 0.169679504287));
+		expectedResults.put(2L, Tuple2.of(0.162412612499, 0.161015667261));
+		expectedResults.put(8L, Tuple2.of(0.167064641724, 0.158592966505));
+		expectedResults.put(13L, Tuple2.of(0.041915595624, 0.0407091625629));
+		expectedResults.put(29L, Tuple2.of(0.0102017346511, 0.0146218045999));
+		expectedResults.put(109L, Tuple2.of(0.00190531000389, 0.00481944993023));
+		expectedResults.put(394L, Tuple2.of(0.0122287016161, 0.0147987969538));
+		expectedResults.put(652L, Tuple2.of(0.010966659242, 0.0113713306749));
+		expectedResults.put(1020L, Tuple2.of(0.0, 0.000326973732127));
+
+		for (Map.Entry<Long, Tuple2<Double, Double>> expected : expectedResults.entrySet()) {
+			double hubScore = results.get(expected.getKey()).getHubScore().getValue();
+			double authorityScore = results.get(expected.getKey()).getAuthorityScore().getValue();
+
+			assertEquals(expected.getValue().f0, hubScore, 0.00001);
+			assertEquals(expected.getValue().f1, authorityScore, 0.00001);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java
new file mode 100644
index 0000000..9c3de71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/PageRankTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.library.linkanalysis.PageRank.Result;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link PageRank}.
+ */
+public class PageRankTest
+extends AsmTestBase {
+
+	private static final double DAMPING_FACTOR = 0.85;
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	pagerank=nx.algorithms.link_analysis.pagerank(graph)
+
+	for key in sorted(pagerank):
+		print('{}: {}'.format(key, pagerank[key]))
+	 */
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		DataSet<Result<IntValue>> pr = new PageRank<IntValue, NullValue, NullValue>(DAMPING_FACTOR, 10)
+			.run(directedSimpleGraph);
+
+		List<Double> expectedResults = new ArrayList<>();
+		expectedResults.add(0.09091296131286301);
+		expectedResults.add(0.27951855944178117);
+		expectedResults.add(0.12956847924535586);
+		expectedResults.add(0.22329643739217675);
+		expectedResults.add(0.18579060129496028);
+		expectedResults.add(0.09091296131286301);
+
+		for (Tuple2<IntValue, DoubleValue> result : pr.collect()) {
+			int id = result.f0.getValue();
+			assertEquals(expectedResults.get(id), result.f1.getValue(), 0.000001);
+		}
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		double expectedScore = 1.0 / completeGraphVertexCount;
+
+		DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001)
+			.run(completeGraph);
+
+		List<Result<LongValue>> results = pr.collect();
+
+		assertEquals(completeGraphVertexCount, results.size());
+
+		for (Tuple2<LongValue, DoubleValue> result : results) {
+			assertEquals(expectedScore, result.f1.getValue(), 0.000001);
+		}
+	}
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	pagerank=nx.algorithms.link_analysis.pagerank(graph)
+
+	for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]:
+		print('{}: {}'.format(key, pagerank[str(key)]))
+	 */
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001)
+			.run(directedRMatGraph(10, 16));
+
+		Map<Long, Result<LongValue>> results = new HashMap<>();
+		for (Result<LongValue> result :  new Collect<Result<LongValue>>().run(pr).execute()) {
+			results.put(result.getVertexId0().getValue(), result);
+		}
+
+		assertEquals(902, results.size());
+
+		Map<Long, Double> expectedResults = new HashMap<>();
+		// a pseudo-random selection of results, both high and low
+		expectedResults.put(0L, 0.027111807822);
+		expectedResults.put(1L, 0.0132842310382);
+		expectedResults.put(2L, 0.0121818392504);
+		expectedResults.put(8L, 0.0115916809743);
+		expectedResults.put(13L, 0.00183249490033);
+		expectedResults.put(29L, 0.000848095047082);
+		expectedResults.put(109L, 0.000308507844048);
+		expectedResults.put(394L, 0.000828743280246);
+		expectedResults.put(652L, 0.000684102931253);
+		expectedResults.put(1020L, 0.000250487135148);
+
+		for (Map.Entry<Long, Double> expected : expectedResults.entrySet()) {
+			double value = results.get(expected.getKey()).getPageRankScore().getValue();
+
+			assertEquals(expected.getValue(), value, 0.00001);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
index 24f0c2d..9b1d18c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
@@ -22,10 +22,14 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.test.TestGraphUtils;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link ChecksumHashCode}.
+ */
 public class ChecksumHashCodeTest
 extends AsmTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
index 117b3ae..05042c2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.graph.library.metric.directed;
 
-import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
 import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link EdgeMetrics}.
+ */
 public class EdgeMetricsTest
 extends AsmTestBase {
 
@@ -47,7 +51,7 @@ extends AsmTestBase {
 	public void testWithCompleteGraph()
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
-		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
 		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
 
 		Result expectedResult = new Result(expectedTriplets / 3, 2 * expectedTriplets / 3,

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
index 54301f5..f72a7bb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.graph.library.metric.directed;
 
-import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link VertexMetrics}.
+ */
 public class VertexMetricsTest
 extends AsmTestBase {
 
@@ -48,7 +52,7 @@ extends AsmTestBase {
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
 		long expectedBidirectionalEdges = completeGraphVertexCount * expectedDegree / 2;
-		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
 		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
 
 		Result expectedResult = new Result(completeGraphVertexCount, 0, expectedBidirectionalEdges,

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
index b4e9f95..3e23906 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.graph.library.metric.undirected;
 
-import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
 import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link EdgeMetrics}.
+ */
 public class EdgeMetricsTest
 extends AsmTestBase {
 
@@ -47,7 +51,7 @@ extends AsmTestBase {
 	public void testWithCompleteGraph()
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
-		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
 		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
 
 		Result expectedResult = new Result(expectedTriplets / 3, 2 * expectedTriplets / 3,

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
index 848ad79..71e587b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -18,16 +18,20 @@
 
 package org.apache.flink.graph.library.metric.undirected;
 
-import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.apache.flink.graph.asm.AsmTestBase;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link VertexMetrics}.
+ */
 public class VertexMetricsTest
 extends AsmTestBase {
 
@@ -48,7 +52,7 @@ extends AsmTestBase {
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
 		long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
-		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
 		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
 
 		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets,

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
index 76b28da..aa259a2 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
@@ -25,20 +25,24 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link AdamicAdar}.
+ */
 public class AdamicAdarTest
 extends AsmTestBase {
 
 	private float[] ilog = {
-		1.0f / (float)Math.log(2),
-		1.0f / (float)Math.log(3),
-		1.0f / (float)Math.log(3),
-		1.0f / (float)Math.log(4),
-		1.0f / (float)Math.log(1),
-		1.0f / (float)Math.log(1)
+		1.0f / (float) Math.log(2),
+		1.0f / (float) Math.log(3),
+		1.0f / (float) Math.log(3),
+		1.0f / (float) Math.log(4),
+		1.0f / (float) Math.log(1),
+		1.0f / (float) Math.log(1)
 	};
 
 	@Test
@@ -98,7 +102,7 @@ extends AsmTestBase {
 	@Test
 	public void testCompleteGraph()
 			throws Exception {
-		float expectedScore = (completeGraphVertexCount - 2) / (float)Math.log(completeGraphVertexCount - 1);
+		float expectedScore = (completeGraphVertexCount - 2) / (float) Math.log(completeGraphVertexCount - 1);
 
 		DataSet<Result<LongValue>> aa = completeGraph
 			.run(new AdamicAdar<LongValue, NullValue, NullValue>());

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 2443359..d8cd298 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -27,10 +27,14 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link JaccardIndex}.
+ */
 public class JaccardIndexTest
 extends AsmTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
index fb21c14..71937db 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
@@ -38,12 +38,15 @@ import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.types.NullValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+/**
+ * Validate compiled {@link VertexCentricIteration} programs.
+ */
 public class PregelCompilerTest extends CompilerTestBase {
 
 	private static final long serialVersionUID = 1L;
@@ -51,223 +54,198 @@ public class PregelCompilerTest extends CompilerTestBase {
 	@SuppressWarnings("serial")
 	@Test
 	public void testPregelCompiler() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
-						}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		// compose test program
+		{
+
+			DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+				new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+				.map(new Tuple2ToVertexMap<Long, Long>());
+
+			DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+				.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+					public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+						return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+					}
 				});
 
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-				
-				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
-						new CCCompute(), null, 100).getVertices();
-				
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("Pregel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof SingleInputPlanNode);
-			
-			SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
-			assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
-			
-			// check the computation coGroup
-			DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
-			assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
-			assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
-			assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
 
+			DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+				new CCCompute(), null, 100).getVertices();
+
+			result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
 		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+
+		Plan p = env.createProgramPlan("Pregel Connected Components");
+		OptimizedPlan op = compileNoStats(p);
+
+		// check the sink
+		SinkPlanNode sink = op.getDataSinks().iterator().next();
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+		// check the iteration
+		WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+		// check the solution set delta
+		PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+		assertTrue(ssDelta instanceof SingleInputPlanNode);
+
+		SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
+		assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
+
+		// check the computation coGroup
+		DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
+		assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
+		assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
+		assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
+
+		assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
+
+		// check that the initial partitioning is pushed out of the loop
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+		assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
 	}
-	
+
 	@SuppressWarnings("serial")
 	@Test
 	public void testPregelCompilerWithBroadcastVariable() {
-		try {
-			final String BC_VAR_NAME = "borat variable";
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> bcVar = env.fromElements(1L);
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-						.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-								return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
-							}
-					});
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
-				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
-				parameters.addBroadcastSet(BC_VAR_NAME, bcVar);
-
-				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
-						new CCCompute(), null, 100, parameters)
-						.getVertices();
-					
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+		final String broadcastSetName = "broadcast";
 
-			}
-			
-			Plan p = env.createProgramPlan("Pregel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof SingleInputPlanNode);
-			
-			SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
-			assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
-			
-			// check the computation coGroup
-			DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
-			assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
-			assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
-			assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		// compose test program
+		{
+			DataSet<Long> bcVar = env.fromElements(1L);
+
+			DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+				new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+				.map(new Tuple2ToVertexMap<Long, Long>());
+
+			DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+				.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+					public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+						return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+					}
+				});
+
+			Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+			VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+			parameters.addBroadcastSet(broadcastSetName, bcVar);
+
+			DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+				new CCCompute(), null, 100, parameters)
+				.getVertices();
+
+			result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
 		}
+
+		Plan p = env.createProgramPlan("Pregel Connected Components");
+		OptimizedPlan op = compileNoStats(p);
+
+		// check the sink
+		SinkPlanNode sink = op.getDataSinks().iterator().next();
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+		// check the iteration
+		WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+		// check the solution set delta
+		PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+		assertTrue(ssDelta instanceof SingleInputPlanNode);
+
+		SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
+		assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
+
+		// check the computation coGroup
+		DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
+		assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
+		assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
+		assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
+
+		assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
+
+		// check that the initial partitioning is pushed out of the loop
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+		assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
 	}
 
 	@SuppressWarnings("serial")
 	@Test
 	public void testPregelWithCombiner() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
-						}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+		// compose test program
+		{
+
+			DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+				new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+				.map(new Tuple2ToVertexMap<Long, Long>());
+
+			DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+				.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+					public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+						return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+					}
 				});
 
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-				
-				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
-						new CCCompute(), new CCCombiner(), 100).getVertices();
-				
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("Pregel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-
-			// check the combiner
-			SingleInputPlanNode combiner = (SingleInputPlanNode) iteration.getInput2().getSource();
-			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
-			
-			// check the solution set delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof SingleInputPlanNode);
-			
-			SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
-			assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
-			
-			// check the computation coGroup
-			DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
-			assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
-			assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
-			assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
 
+			DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+				new CCCompute(), new CCCombiner(), 100).getVertices();
+
+			result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
 		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+
+		Plan p = env.createProgramPlan("Pregel Connected Components");
+		OptimizedPlan op = compileNoStats(p);
+
+		// check the sink
+		SinkPlanNode sink = op.getDataSinks().iterator().next();
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+		// check the iteration
+		WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+		// check the combiner
+		SingleInputPlanNode combiner = (SingleInputPlanNode) iteration.getInput2().getSource();
+		assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+
+		// check the solution set delta
+		PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+		assertTrue(ssDelta instanceof SingleInputPlanNode);
+
+		SingleInputPlanNode ssFlatMap = (SingleInputPlanNode) ((SingleInputPlanNode) (ssDelta)).getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, ssFlatMap.getParallelism());
+		assertEquals(ShipStrategyType.FORWARD, ssFlatMap.getInput().getShipStrategy());
+
+		// check the computation coGroup
+		DualInputPlanNode computationCoGroup = (DualInputPlanNode) (ssFlatMap.getInput().getSource());
+		assertEquals(DEFAULT_PARALLELISM, computationCoGroup.getParallelism());
+		assertEquals(ShipStrategyType.FORWARD, computationCoGroup.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, computationCoGroup.getInput2().getShipStrategy());
+		assertTrue(computationCoGroup.getInput2().getTempMode().isCached());
+
+		assertEquals(new FieldList(0), computationCoGroup.getInput2().getShipStrategyKeys());
+
+		// check that the initial partitioning is pushed out of the loop
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+		assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
 	}
 
 	@SuppressWarnings("serial")
@@ -283,7 +261,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 
 			if ((getSuperstepNumber() == 1) || (currentComponent < vertex.getValue())) {
 				setNewVertexValue(currentComponent);
-				for (Edge<Long, NullValue> edge: getEdges()) {
+				for (Edge<Long, NullValue> edge : getEdges()) {
 					sendMessageTo(edge.getTarget(), currentComponent);
 				}
 			}
@@ -291,16 +269,15 @@ public class PregelCompilerTest extends CompilerTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class CCCombiner extends MessageCombiner<Long, Long> {
+	private static final class CCCombiner extends MessageCombiner<Long, Long> {
 
 		public void combineMessages(MessageIterator<Long> messages) {
 
 			long minMessage = Long.MAX_VALUE;
-			for (Long msg: messages) {
+			for (Long msg : messages) {
 				minMessage = Math.min(minMessage, msg);
 			}
 			sendCombinedMessage(minMessage);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
index 3bf2e32..8084e71 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.graph.pregel;
 
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
@@ -33,100 +32,92 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+/**
+ * Test the creation of a {@link VertexCentricIteration} program.
+ */
 @SuppressWarnings("serial")
 public class PregelTranslationTest {
 
+	private static final String ITERATION_NAME = "Test Name";
+
+	private static final String AGGREGATOR_NAME = "AggregatorName";
+
+	private static final String BC_SET_NAME = "broadcast messages";
+
+	private static final int NUM_ITERATIONS = 13;
+
+	private static final int ITERATION_parallelism = 77;
+
 	@Test
 	public void testTranslationPlainEdges() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-			
-			final String AGGREGATOR_NAME = "AggregatorName";
-			
-			final String BC_SET_NAME = "borat messages";
-
-			final int NUM_ITERATIONS = 13;
-			
-			final int ITERATION_parallelism = 77;
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<Long> bcVar = env.fromElements(1L);
-			
-			DataSet<Vertex<String, Double>> result;
-			
-			// ------------ construct the test program ------------------
-			{
-				
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
-
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
-
-				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
-						edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
-
-							public Tuple3<String, String, NullValue> map(
-									Tuple2<String, String> edge) {
-								return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
-							}
-						}), env);
-
-				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
-
-				parameters.addBroadcastSet(BC_SET_NAME, bcVar);
-				parameters.setName(ITERATION_NAME);
-				parameters.setParallelism(ITERATION_parallelism);
-				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-
-				result = graph.runVertexCentricIteration(new MyCompute(), null,
-						NUM_ITERATIONS, parameters).getVertices();
-
-				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
-			}
-			
-			
-			// ------------- validate the java program ----------------
-			
-			assertTrue(result instanceof DeltaIterationResultSet);
-			
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
-			
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-			
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-			
-			TwoInputUdfOperator<?, ?, ?, ?> computationCoGroup =
-					(TwoInputUdfOperator<?, ?, ?, ?>) ((SingleInputUdfOperator<?, ?, ?>) resultSet.getNextWorkset()).getInput();
-			
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcVar, computationCoGroup.getBroadcastSets().get(BC_SET_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Long> bcVar = env.fromElements(1L);
+
+		DataSet<Vertex<String, Double>> result;
+
+		// ------------ construct the test program ------------------
+
+		DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+
+		DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
+
+		Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+			edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
+
+				public Tuple3<String, String, NullValue> map(
+					Tuple2<String, String> edge) {
+					return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
+				}
+			}), env);
+
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		parameters.addBroadcastSet(BC_SET_NAME, bcVar);
+		parameters.setName(ITERATION_NAME);
+		parameters.setParallelism(ITERATION_parallelism);
+		parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+
+		result = graph.runVertexCentricIteration(new MyCompute(), null,
+			NUM_ITERATIONS, parameters).getVertices();
+
+		result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
+
+		// ------------- validate the java program ----------------
+
+		assertTrue(result instanceof DeltaIterationResultSet);
+
+		DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+		DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
+
+		// check the basic iteration properties
+		assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+		assertArrayEquals(new int[]{0}, resultSet.getKeyPositions());
+		assertEquals(ITERATION_parallelism, iteration.getParallelism());
+		assertEquals(ITERATION_NAME, iteration.getName());
+
+		assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+
+		TwoInputUdfOperator<?, ?, ?, ?> computationCoGroup =
+			(TwoInputUdfOperator<?, ?, ?, ?>) ((SingleInputUdfOperator<?, ?, ?>) resultSet.getNextWorkset()).getInput();
+
+		// validate that the broadcast sets are forwarded
+		assertEquals(bcVar, computationCoGroup.getBroadcastSets().get(BC_SET_NAME));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	private static final class MyCompute extends ComputeFunction<String, Double, NullValue, Double> {
 
 		@Override
-		public void compute(Vertex<String, Double> vertex,
-				MessageIterator<Double> messages) throws Exception {}
+		public void compute(Vertex<String, Double> vertex, MessageIterator<Double> messages) throws Exception {
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 676e0cd..1c6d08e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -40,13 +40,15 @@ import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.types.NullValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 
+/**
+ * Validate compiled {@link ScatterGatherIteration} programs.
+ */
 public class SpargelCompilerTest extends CompilerTestBase {
 
 	private static final long serialVersionUID = 1L;
@@ -54,161 +56,143 @@ public class SpargelCompilerTest extends CompilerTestBase {
 	@SuppressWarnings("serial")
 	@Test
 	public void testSpargelCompiler() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-							return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
-						}
-				});
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
-				DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
-						new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
-						new ConnectedComponents.CCUpdater<Long, Long>(), 100)
-						.getVertices();
-				
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-			
-			// check that the initial workset sort is outside the loop
-			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
-			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		// compose test program
+		DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+			new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+			.map(new Tuple2ToVertexMap<Long, Long>());
+
+		DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+			.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+				public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+					return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+				}
+			});
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+		DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
+			new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+			new ConnectedComponents.CCUpdater<Long, Long>(), 100)
+			.getVertices();
+
+		result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+		Plan p = env.createProgramPlan("Spargel Connected Components");
+		OptimizedPlan op = compileNoStats(p);
+
+		// check the sink
+		SinkPlanNode sink = op.getDataSinks().iterator().next();
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+		// check the iteration
+		WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+		// check the solution set join and the delta
+		PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+		assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+		DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+		assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+		assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+		assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+		// check the workset set join
+		DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+		assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+		assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+		assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+		assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+		// check that the initial partitioning is pushed out of the loop
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+		assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+		assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+
+		// check that the initial workset sort is outside the loop
+		assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+		assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
 	}
-	
+
 	@SuppressWarnings("serial")
 	@Test
 	public void testSpargelCompilerWithBroadcastVariable() {
-		try {
-			final String BC_VAR_NAME = "borat variable";
-			
-			
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(DEFAULT_PARALLELISM);
-			// compose test program
-			{
-				DataSet<Long> bcVar = env.fromElements(1L);
-
-				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
-						new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-						.map(new Tuple2ToVertexMap<Long, Long>());
-
-				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
-						.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
-								return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
-							}
-					});
-
-				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
-				ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
-				parameters.addBroadcastSetForScatterFunction(BC_VAR_NAME, bcVar);
-				parameters.addBroadcastSetForGatherFunction(BC_VAR_NAME, bcVar);
-
-				DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
-						new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
-						new ConnectedComponents.CCUpdater<Long, Long>(), 100)
-						.getVertices();
-					
-				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-
-			}
-			
-			Plan p = env.createProgramPlan("Spargel Connected Components");
-			OptimizedPlan op = compileNoStats(p);
-			
-			// check the sink
-			SinkPlanNode sink = op.getDataSinks().iterator().next();
-			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
-			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
-			
-			// check the iteration
-			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
-			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
-			
-			// check the solution set join and the delta
-			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
-			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
-			
-			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
-			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
-			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
-			
-			// check the workset set join
-			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
-			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
-			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
-			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
-			
-			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
-			
-			// check that the initial partitioning is pushed out of the loop
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
-			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
-			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
-			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		final String broadcastVariableName = "broadcast variable";
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		// compose test program
+
+		DataSet<Long> bcVar = env.fromElements(1L);
+
+		DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+			new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
+			.map(new Tuple2ToVertexMap<Long, Long>());
+
+		DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
+			.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+				public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+					return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
+				}
+			});
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+		parameters.addBroadcastSetForScatterFunction(broadcastVariableName, bcVar);
+		parameters.addBroadcastSetForGatherFunction(broadcastVariableName, bcVar);
+
+		DataSet<Vertex<Long, Long>> result = graph.runScatterGatherIteration(
+			new ConnectedComponents.CCMessenger<Long, Long>(BasicTypeInfo.LONG_TYPE_INFO),
+			new ConnectedComponents.CCUpdater<Long, Long>(), 100)
+			.getVertices();
+
+		result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+		Plan p = env.createProgramPlan("Spargel Connected Components");
+		OptimizedPlan op = compileNoStats(p);
+
+		// check the sink
+		SinkPlanNode sink = op.getDataSinks().iterator().next();
+		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+		assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+
+		// check the iteration
+		WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+		assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+
+		// check the solution set join and the delta
+		PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+		assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+
+		DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+		assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+		assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+		assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+
+		// check the workset set join
+		DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+		assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+		assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+		assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+
+		assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+
+		// check that the initial partitioning is pushed out of the loop
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+		assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+		assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index 47b785d..d209a2d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -31,187 +31,156 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
+/**
+ * Test the creation of a {@link ScatterGatherIteration} program.
+ */
 @SuppressWarnings("serial")
 public class SpargelTranslationTest {
 
-	@Test
-	public void testTranslationPlainEdges() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-
-			final String AGGREGATOR_NAME = "AggregatorName";
-
-			final String BC_SET_MESSAGES_NAME = "borat messages";
+	private static final String ITERATION_NAME = "Test Name";
 
-			final String BC_SET_UPDATES_NAME = "borat updates";
+	private static final String AGGREGATOR_NAME = "AggregatorName";
 
-			final int NUM_ITERATIONS = 13;
+	private static final String BC_SET_MESSAGES_NAME = "borat messages";
 
-			final int ITERATION_parallelism = 77;
+	private static final String BC_SET_UPDATES_NAME = "borat updates";
 
+	private static final int NUM_ITERATIONS = 13;
 
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+	private static final int ITERATION_parallelism = 77;
 
-			DataSet<Long> bcMessaging = env.fromElements(1L);
-			DataSet<Long> bcUpdate = env.fromElements(1L);
+	@Test
+	public void testTranslationPlainEdges() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-			DataSet<Vertex<String, Double>> result;
+		DataSet<Long> bcMessaging = env.fromElements(1L);
+		DataSet<Long> bcUpdate = env.fromElements(1L);
 
-			// ------------ construct the test program ------------------
-			{
+		DataSet<Vertex<String, Double>> result;
 
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+		// ------------ construct the test program ------------------
 
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
+		DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
 
-				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
-						edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
+		DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
-							public Tuple3<String, String, NullValue> map(
-									Tuple2<String, String> edge) {
-								return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
-							}
-						}), env);
+		Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+			edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
-				ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+				public Tuple3<String, String, NullValue> map(
+					Tuple2<String, String> edge) {
+					return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
+				}
+			}), env);
 
-				parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging);
-				parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate);
-				parameters.setName(ITERATION_NAME);
-				parameters.setParallelism(ITERATION_parallelism);
-				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
-				result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
-						NUM_ITERATIONS, parameters).getVertices();
+		parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcMessaging);
+		parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcUpdate);
+		parameters.setName(ITERATION_NAME);
+		parameters.setParallelism(ITERATION_parallelism);
+		parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
 
-				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
-			}
+		result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
+			NUM_ITERATIONS, parameters).getVertices();
 
+		result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
 
-			// ------------- validate the java program ----------------
+		// ------------- validate the java program ----------------
 
-			assertTrue(result instanceof DeltaIterationResultSet);
+		assertTrue(result instanceof DeltaIterationResultSet);
 
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
+		DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+		DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
 
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
+		// check the basic iteration properties
+		assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+		assertArrayEquals(new int[]{0}, resultSet.getKeyPositions());
+		assertEquals(ITERATION_parallelism, iteration.getParallelism());
+		assertEquals(ITERATION_NAME, iteration.getName());
 
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+		assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
 
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+		// validate that the semantic properties are set as they should
+		TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+		assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
 
-			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
+		TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
 
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
-			assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		// validate that the broadcast sets are forwarded
+		assertEquals(bcUpdate, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+		assertEquals(bcMessaging, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
 	}
 
 	@Test
 	public void testTranslationPlainEdgesWithForkedBroadcastVariable() {
-		try {
-			final String ITERATION_NAME = "Test Name";
-
-			final String AGGREGATOR_NAME = "AggregatorName";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-			final String BC_SET_MESSAGES_NAME = "borat messages";
+		DataSet<Long> bcVar = env.fromElements(1L);
 
-			final String BC_SET_UPDATES_NAME = "borat updates";
+		DataSet<Vertex<String, Double>> result;
 
-			final int NUM_ITERATIONS = 13;
+		// ------------ construct the test program ------------------
 
-			final int ITERATION_parallelism = 77;
+		DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
 
+		DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
 
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
+			edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
-			DataSet<Long> bcVar = env.fromElements(1L);
+				public Tuple3<String, String, NullValue> map(
+					Tuple2<String, String> edge) {
+					return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
+				}
+			}), env);
 
-			DataSet<Vertex<String, Double>> result;
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
 
-			// ------------ construct the test program ------------------
-			{
+		parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar);
+		parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar);
+		parameters.setName(ITERATION_NAME);
+		parameters.setParallelism(ITERATION_parallelism);
+		parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
 
-				DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
+		result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
+			NUM_ITERATIONS, parameters).getVertices();
 
-				DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
+		result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
 
-				Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
-						edges.map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
+		// ------------- validate the java program ----------------
 
-							public Tuple3<String, String, NullValue> map(
-									Tuple2<String, String> edge) {
-								return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
-							}
-						}), env);
+		assertTrue(result instanceof DeltaIterationResultSet);
 
-				ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+		DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+		DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
 
-				parameters.addBroadcastSetForScatterFunction(BC_SET_MESSAGES_NAME, bcVar);
-				parameters.addBroadcastSetForGatherFunction(BC_SET_UPDATES_NAME, bcVar);
-				parameters.setName(ITERATION_NAME);
-				parameters.setParallelism(ITERATION_parallelism);
-				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+		// check the basic iteration properties
+		assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+		assertArrayEquals(new int[]{0}, resultSet.getKeyPositions());
+		assertEquals(ITERATION_parallelism, iteration.getParallelism());
+		assertEquals(ITERATION_NAME, iteration.getName());
 
-				result = graph.runScatterGatherIteration(new MessageFunctionNoEdgeValue(), new UpdateFunction(),
-						NUM_ITERATIONS, parameters).getVertices();
+		assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
 
-				result.output(new DiscardingOutputFormat<Vertex<String, Double>>());
-			}
+		// validate that the semantic properties are set as they should
+		TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+		assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+		assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
 
+		TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
 
-			// ------------- validate the java program ----------------
-
-			assertTrue(result instanceof DeltaIterationResultSet);
-
-			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
-			DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
-
-			// check the basic iteration properties
-			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
-			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
-			assertEquals(ITERATION_parallelism, iteration.getParallelism());
-			assertEquals(ITERATION_NAME, iteration.getName());
-
-			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
-
-			// validate that the semantic properties are set as they should
-			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
-			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
-
-			TwoInputUdfOperator<?, ?, ?, ?> edgesJoin = (TwoInputUdfOperator<?, ?, ?, ?>) solutionSetJoin.getInput1();
-
-			// validate that the broadcast sets are forwarded
-			assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
-			assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+		// validate that the broadcast sets are forwarded
+		assertEquals(bcVar, solutionSetJoin.getBroadcastSets().get(BC_SET_UPDATES_NAME));
+		assertEquals(bcVar, edgesJoin.getBroadcastSets().get(BC_SET_MESSAGES_NAME));
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -219,12 +188,14 @@ public class SpargelTranslationTest {
 	private static class MessageFunctionNoEdgeValue extends ScatterFunction<String, Double, Long, NullValue> {
 
 		@Override
-		public void sendMessages(Vertex<String, Double> vertex) {}
+		public void sendMessages(Vertex<String, Double> vertex) {
+		}
 	}
 
 	private static class UpdateFunction extends GatherFunction<String, Double, Long> {
 
 		@Override
-		public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {}
+		public void updateVertex(Vertex<String, Double> vertex, MessageIterator<Long> inMessages) {
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index 78f0ba2..2454b38 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -29,18 +29,19 @@ import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.ScatterFunction;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Dummy iteration to test that the supersteps are correctly incremented
+ * and can be retrieved from inside the scatter and gather functions.
+ * All vertices start with value 1 and increase their value by 1
+ * in each iteration.
+ */
 @SuppressWarnings("serial")
 public class CollectionModeSuperstepITCase extends TestLogger {
 
-	/**
-	 * Dummy iteration to test that the supersteps are correctly incremented
-	 * and can be retrieved from inside the scatter and gather functions.
-	 * All vertices start with value 1 and increase their value by 1
-	 * in each iteration. 
-	 */
 	@Test
 	public void testProgram() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 183522d..f866f38 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,6 +43,9 @@ import org.junit.runners.Parameterized;
 import java.util.HashSet;
 import java.util.List;
 
+/**
+ * Tests for {@link GSAConfiguration}.
+ */
 @RunWith(Parameterized.class)
 public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase {
 
@@ -235,7 +239,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("gatherBcastSet");
+			List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("gatherBcastSet");
 			Assert.assertEquals(1, bcastSet.get(0).intValue());
 			Assert.assertEquals(2, bcastSet.get(1).intValue());
 			Assert.assertEquals(3, bcastSet.get(2).intValue());
@@ -266,7 +270,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("sumBcastSet");
+			List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("sumBcastSet");
 			Assert.assertEquals(4, bcastSet.get(0).intValue());
 			Assert.assertEquals(5, bcastSet.get(1).intValue());
 			Assert.assertEquals(6, bcastSet.get(2).intValue());
@@ -295,7 +299,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("applyBcastSet");
+			List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("applyBcastSet");
 			Assert.assertEquals(7, bcastSet.get(0).intValue());
 			Assert.assertEquals(8, bcastSet.get(1).intValue());
 			Assert.assertEquals(9, bcastSet.get(2).intValue());
@@ -346,7 +350,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+	private static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
 
 		public Long map(Vertex<Long, Long> value) {
 			return 1L;
@@ -354,7 +358,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 	}
 
 	@SuppressWarnings("serial")
-	public static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
+	private static final class InitialiseHashSetMapper implements MapFunction<Vertex<Long, Long>, HashSet<Long>> {
 
 		@Override
 		public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index 3c091a9..139ff1e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.graph.spargel.ScatterGatherIteration;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +44,9 @@ import org.junit.runners.Parameterized;
 import java.util.HashSet;
 import java.util.List;
 
+/**
+ * Tests for {@link ScatterGatherConfiguration}.
+ */
 @RunWith(Parameterized.class)
 public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 
@@ -133,7 +137,6 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
 			new MessageFunctionDefault(), new UpdateFunctionDefault(), 5);
 
-
 		DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
 		List<Tuple2<Long, Long>> result = data.collect();
 
@@ -333,7 +336,6 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expectedResult);
 	}
 
-
 	@Test
 	public void testNumVerticesNotSet() throws Exception {
 
@@ -508,7 +510,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("messagingBcastSet");
+			List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("messagingBcastSet");
 			Assert.assertEquals(4, bcastSet.get(0).intValue());
 			Assert.assertEquals(5, bcastSet.get(1).intValue());
 			Assert.assertEquals(6, bcastSet.get(2).intValue());
@@ -556,7 +558,7 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase {
 
 			// test bcast variable
 			@SuppressWarnings("unchecked")
-			List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("updateBcastSet");
+			List<Integer> bcastSet = (List<Integer>) (List<?>) getBroadcastSet("updateBcastSet");
 			Assert.assertEquals(1, bcastSet.get(0).intValue());
 			Assert.assertEquals(2, bcastSet.get(1).intValue());
 			Assert.assertEquals(3, bcastSet.get(2).intValue());