You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/04 12:35:25 UTC

[2/7] flink git commit: [FLINK-6880] [runtime] Activate checkstyle for runtime/iterative

[FLINK-6880] [runtime] Activate checkstyle for runtime/iterative

This closes #4098.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/834c5277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/834c5277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/834c5277

Branch: refs/heads/master
Commit: 834c527783346f777bf5def4c61f1791b2a89473
Parents: 343a804
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 12:53:41 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 11:37:12 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../concurrent/BlockingBackChannel.java         | 13 ++-
 .../concurrent/BlockingBackChannelBroker.java   |  8 +-
 .../runtime/iterative/concurrent/Broker.java    | 17 ++--
 .../concurrent/IterationAggregatorBroker.java   | 11 ++-
 .../iterative/concurrent/SolutionSetBroker.java |  5 +-
 .../SolutionSetUpdateBarrierBroker.java         |  5 +-
 .../iterative/concurrent/SuperstepBarrier.java  | 22 +++--
 .../concurrent/SuperstepKickoffLatch.java       | 15 ++--
 .../concurrent/SuperstepKickoffLatchBroker.java |  7 +-
 .../WorksetEmptyConvergenceCriterion.java       | 10 +--
 .../iterative/event/AllWorkersDoneEvent.java    | 11 ++-
 .../event/IterationEventWithAggregators.java    | 49 ++++++------
 .../iterative/event/TerminationEvent.java       |  9 +--
 .../iterative/event/WorkerDoneEvent.java        | 23 +++---
 .../iterative/io/HashPartitionIterator.java     | 12 +--
 .../iterative/io/SerializedUpdateBuffer.java    | 21 ++---
 .../SolutionSetFastUpdateOutputCollector.java   | 12 +--
 ...SolutionSetObjectsUpdateOutputCollector.java | 14 ++--
 .../io/SolutionSetUpdateOutputCollector.java    | 14 ++--
 .../io/WorksetUpdateOutputCollector.java        |  9 +--
 .../iterative/task/AbstractIterativeTask.java   | 56 ++++++-------
 .../iterative/task/IterationHeadTask.java       | 66 +++++++--------
 .../task/IterationIntermediateTask.java         | 23 +++---
 .../task/IterationSynchronizationSinkTask.java  | 52 ++++++------
 .../iterative/task/IterationTailTask.java       | 21 ++---
 .../task/RuntimeAggregatorRegistry.java         | 30 +++----
 .../iterative/task/SyncEventHandler.java        | 25 +++---
 .../runtime/iterative/task/Terminable.java      |  3 +-
 .../concurrent/BlockingBackChannelTest.java     | 21 ++---
 .../iterative/concurrent/BrokerTest.java        |  8 +-
 .../iterative/concurrent/StringPair.java        |  1 -
 .../concurrent/SuperstepBarrierTest.java        | 15 ++--
 .../concurrent/SuperstepKickoffLatchTest.java   | 84 ++++++++++----------
 .../event/EventWithAggregatorsTest.java         | 59 +++++++-------
 35 files changed, 383 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 654227a..df905dc 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -441,7 +441,6 @@ under the License.
 						**/runtime/highavailability/**,
 						**/runtime/instance/**,
 						**/runtime/io/**,
-						**/runtime/iterative/**,
 						**/runtime/jobgraph/**,
 						**/runtime/jobmanager/**,
 						**/runtime/jobmaster/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
index 067bbfe..1373654 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
@@ -16,27 +16,26 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
 
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
 /**
  * A concurrent datastructure that establishes a backchannel buffer between an iteration head
  * and an iteration tail.
  */
 public class BlockingBackChannel {
 
-	/** buffer to send back the superstep results */
+	/** Buffer to send back the superstep results. */
 	private final SerializedUpdateBuffer buffer;
 
-	/** a one element queue used for blocking hand over of the buffer */
+	/** A one element queue used for blocking hand over of the buffer. */
 	private final BlockingQueue<SerializedUpdateBuffer> queue;
 
 	public BlockingBackChannel(SerializedUpdateBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
index daa3ec3..120ecf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 /**
- * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails
+ * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails.
  */
 public class BlockingBackChannelBroker extends Broker<BlockingBackChannel> {
 
-	/**
-	 * Singleton instance
-	 */
 	private static final BlockingBackChannelBroker INSTANCE = new BlockingBackChannelBroker();
 
 	private BlockingBackChannelBroker() {}
 
 	/**
-	 * retrieve singleton instance
+	 * Retrieve singleton instance.
 	 */
 	public static Broker<BlockingBackChannel> instance() {
 		return INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
index 444d21f..6816af4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 import java.util.concurrent.ArrayBlockingQueue;
@@ -25,14 +24,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * A concurrent data structure that allows the hand-over of an object between a pair of threads
+ * A concurrent data structure that allows the hand-over of an object between a pair of threads.
  */
 public class Broker<V> {
 
 	private final ConcurrentMap<String, BlockingQueue<V>> mediations = new ConcurrentHashMap<String, BlockingQueue<V>>();
 
 	/**
-	 * hand in the object to share
+	 * Hand in the object to share.
 	 */
 	public void handIn(String key, V obj) {
 		if (!retrieveSharedQueue(key).offer(obj)) {
@@ -40,7 +39,7 @@ public class Broker<V> {
 		}
 	}
 
-	/** blocking retrieval and removal of the object to share */
+	/** Blocking retrieval and removal of the object to share. */
 	public V getAndRemove(String key) {
 		try {
 			V objToShare = retrieveSharedQueue(key).take();
@@ -50,13 +49,13 @@ public class Broker<V> {
 			throw new RuntimeException(e);
 		}
 	}
-	
-	/** blocking retrieval and removal of the object to share */
+
+	/** Blocking retrieval and removal of the object to share. */
 	public void remove(String key) {
 		mediations.remove(key);
 	}
-	
-	/** blocking retrieval and removal of the object to share */
+
+	/** Blocking retrieval and removal of the object to share. */
 	public V get(String key) {
 		try {
 			BlockingQueue<V> queue = retrieveSharedQueue(key);
@@ -71,7 +70,7 @@ public class Broker<V> {
 	}
 
 	/**
-	 * thread-safe call to get a shared {@link BlockingQueue}
+	 * Thread-safe call to get a shared {@link BlockingQueue}.
 	 */
 	private BlockingQueue<V> retrieveSharedQueue(String key) {
 		BlockingQueue<V> queue = mediations.get(key);

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
index bf509f6..249a492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
@@ -16,17 +16,20 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 import org.apache.flink.runtime.iterative.task.RuntimeAggregatorRegistry;
 
+/**
+ * {@link Broker} for {@link RuntimeAggregatorRegistry}.
+ */
 public class IterationAggregatorBroker extends Broker<RuntimeAggregatorRegistry> {
-	
-	/** single instance */
+
 	private static final IterationAggregatorBroker INSTANCE = new IterationAggregatorBroker();
 
-	/** retrieve singleton instance */
+	/**
+	 * Retrieve singleton instance.
+	 */
 	public static IterationAggregatorBroker instance() {
 		return INSTANCE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
index 3d9d0ea..f9ba0c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
@@ -23,9 +23,6 @@ package org.apache.flink.runtime.iterative.concurrent;
  */
 public class SolutionSetBroker extends Broker<Object> {
 
-	/**
-	 * Singleton instance
-	 */
 	private static final SolutionSetBroker INSTANCE = new SolutionSetBroker();
 
 	/**
@@ -34,6 +31,6 @@ public class SolutionSetBroker extends Broker<Object> {
 	public static Broker<Object> instance() {
 		return INSTANCE;
 	}
-	
+
 	private SolutionSetBroker() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index 352a262..9a1e40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -22,15 +22,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadTask;
 import org.apache.flink.runtime.iterative.task.IterationTailTask;
 
 /**
- * Broker to hand over {@link SolutionSetUpdateBarrier} from 
+ * Broker to hand over {@link SolutionSetUpdateBarrier} from
  * {@link IterationHeadTask} to
  * {@link IterationTailTask}.
  */
 public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {
 
-	/**
-	 * Singleton instance
-	 */
 	private static final SolutionSetUpdateBarrierBroker INSTANCE = new SolutionSetUpdateBarrierBroker();
 
 	private SolutionSetUpdateBarrierBroker() {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index cc5d3c5..54bb870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -16,22 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import java.util.concurrent.CountDownLatch;
-
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.types.Value;
 
+import java.util.concurrent.CountDownLatch;
+
 /**
  * A resettable one-shot latch.
  */
 public class SuperstepBarrier implements EventListener<TaskEvent> {
-	
+
 	private final ClassLoader userCodeClassLoader;
 
 	private boolean terminationSignaled = false;
@@ -40,19 +39,17 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
 
 	private String[] aggregatorNames;
 	private Value[] aggregates;
-	
-	
+
 	public SuperstepBarrier(ClassLoader userCodeClassLoader) {
 		this.userCodeClassLoader = userCodeClassLoader;
 	}
-	
 
-	/** setup the barrier, has to be called at the beginning of each superstep */
+	/** Setup the barrier, has to be called at the beginning of each superstep. */
 	public void setup() {
 		latch = new CountDownLatch(1);
 	}
 
-	/** wait on the barrier */
+	/** Wait on the barrier. */
 	public void waitForOtherWorkers() throws InterruptedException {
 		latch.await();
 	}
@@ -60,13 +57,12 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
 	public String[] getAggregatorNames() {
 		return aggregatorNames;
 	}
-	
+
 	public Value[] getAggregates() {
 		return aggregates;
 	}
 
-	/** barrier will release the waiting thread if an event occurs
-	 * @param event*/
+	/** Barrier will release the waiting thread if an event occurs. */
 	@Override
 	public void onEvent(TaskEvent event) {
 		if (event instanceof TerminationEvent) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
index 83b7a4a..b86ade2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+/**
+ * Latch used to wait for the previous superstep to complete.
+ */
 public class SuperstepKickoffLatch {
-	
+
 	private final Object monitor = new Object();
-	
+
 	private int superstepNumber = 1;
-	
+
 	private boolean terminated;
-	
+
 	public void triggerNextSuperstep() {
 		synchronized (monitor) {
 			if (terminated) {
@@ -35,14 +38,14 @@ public class SuperstepKickoffLatch {
 			monitor.notifyAll();
 		}
 	}
-	
+
 	public void signalTermination() {
 		synchronized (monitor) {
 			terminated = true;
 			monitor.notifyAll();
 		}
 	}
-	
+
 	public boolean awaitStartOfSuperstepOrTermination(int superstep) throws InterruptedException {
 		while (true) {
 			synchronized (monitor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
index f137680..3b545c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+/**
+ * {@link Broker} for {@link SuperstepKickoffLatch}.
+ */
 public class SuperstepKickoffLatchBroker extends Broker<SuperstepKickoffLatch> {
 
 	private static final SuperstepKickoffLatchBroker INSTANCE = new SuperstepKickoffLatchBroker();
 
 	private SuperstepKickoffLatchBroker() {}
 
-
+	/**
+	 * Retrieve the singleton instance.
+	 */
 	public static Broker<SuperstepKickoffLatch> instance() {
 		return INSTANCE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index 2987b89..19c26bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.convergence;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.types.LongValue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A workset iteration is by definition converged if no records have been updated in the solutionset
+ * A workset iteration is by definition converged if no records have been updated in the solutionset.
  */
 public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger log = LoggerFactory.getLogger(WorksetEmptyConvergenceCriterion.class);
-	
+
 	public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
index 2fd0db1..e62bf78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
@@ -16,19 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 
+import java.util.Map;
+
+/**
+ * Event sent by the {@code IterationSynchronizationSinkTask} to each
+ * {@code IterationHead} signaling to start a new superstep.
+ */
 public class AllWorkersDoneEvent extends IterationEventWithAggregators {
 
 	public AllWorkersDoneEvent() {
 		super();
 	}
-	
+
 	public AllWorkersDoneEvent(Map<String, Aggregator<?>> aggregators) {
 		super(aggregators);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index 4e1c19e..06b7687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -32,13 +27,21 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class for iteration {@link TaskEvent} transmitting operator aggregators.
+ */
 public abstract class IterationEventWithAggregators extends TaskEvent {
-	
+
 	protected static final String[] NO_STRINGS = new String[0];
 	protected static final Value[] NO_VALUES = new Value[0];
-	
+
 	private String[] aggNames;
-	
+
 	private String[] classNames;
 	private byte[][] serializedData;
 
@@ -53,11 +56,11 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 		if (aggregatorName == null || aggregate == null) {
 			throw new NullPointerException();
 		}
-		
+
 		this.aggNames = new String[] { aggregatorName };
 		this.aggregates = new Value[] { aggregate };
 	}
-	
+
 	protected IterationEventWithAggregators(Map<String, Aggregator<?>> aggregators) {
 		int num = aggregators.size();
 		if (num == 0) {
@@ -66,7 +69,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 		} else {
 			this.aggNames = new String[num];
 			this.aggregates = new Value[num];
-			
+
 			int i = 0;
 			for (Map.Entry<String, Aggregator<?>> entry : aggregators.entrySet()) {
 				this.aggNames[i] = entry.getKey();
@@ -75,7 +78,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 			}
 		}
 	}
-	
+
 	public String[] getAggregatorNames() {
 		return this.aggNames;
 	}
@@ -97,21 +100,19 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 				catch (ClassCastException e) {
 					throw new RuntimeException("User-defined aggregator class is not a value sublass.");
 				}
-				
-				
+
 				try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(
-					new ByteArrayInputStream(serializedData[i])))
-				{
+					new ByteArrayInputStream(serializedData[i]))) {
 					v.read(in);
 				}
 				catch (IOException e) {
 					throw new RuntimeException("Error while deserializing the user-defined aggregate class.", e);
 				}
-				
+
 				aggregates[i] = v;
 			}
 		}
-		
+
 		return this.aggregates;
 	}
 
@@ -119,15 +120,15 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 	public void write(DataOutputView out) throws IOException {
 		int num = this.aggNames.length;
 		out.writeInt(num);
-		
+
 		ByteArrayOutputStream boas = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper bufferStream = new DataOutputViewStreamWrapper(boas);
-		
+
 		for (int i = 0; i < num; i++) {
 			// aggregator name and type
 			out.writeUTF(this.aggNames[i]);
 			out.writeUTF(this.aggregates[i].getClass().getName());
-			
+
 			// aggregator value indirect as a byte array
 			this.aggregates[i].write(bufferStream);
 			bufferStream.flush();
@@ -160,14 +161,14 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 			for (int i = 0; i < num; i++) {
 				this.aggNames[i] = in.readUTF();
 				this.classNames[i] = in.readUTF();
-				
+
 				int len = in.readInt();
 				byte[] data = new byte[len];
 				this.serializedData[i] = data;
 				in.readFully(data);
-				
+
 			}
-			
+
 			this.aggregates = null;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 28181e8..f1523df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -16,22 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.IOException;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.TaskEvent;
 
+import java.io.IOException;
+
 /**
- * Signals that the iteration is completely executed, participating tasks must terminate now
+ * Signals that the iteration is completely executed, participating tasks must terminate now.
  */
 public class TerminationEvent extends TaskEvent {
 
 	public static final TerminationEvent INSTANCE = new TerminationEvent();
-	
+
 	@Override
 	public void write(DataOutputView out) throws IOException {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
index 2e348e9..ce24f23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
@@ -16,21 +16,24 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Value;
 
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Completion event sent from each {@code IterationHead} to the
+ * {@code IterationSynchronizationSinkTask}.
+ */
 public class WorkerDoneEvent extends IterationEventWithAggregators {
-	
+
 	private int workerIndex;
-	
+
 	public WorkerDoneEvent() {
 		super();
 	}
@@ -39,22 +42,22 @@ public class WorkerDoneEvent extends IterationEventWithAggregators {
 		super(aggregatorName, aggregate);
 		this.workerIndex = workerIndex;
 	}
-	
+
 	public WorkerDoneEvent(int workerIndex, Map<String, Aggregator<?>> aggregators) {
 		super(aggregators);
 		this.workerIndex = workerIndex;
 	}
-	
+
 	public int getWorkerIndex() {
 		return workerIndex;
 	}
-	
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(this.workerIndex);
 		super.write(out);
 	}
-	
+
 	@Override
 	public void read(DataInputView in) throws IOException {
 		this.workerIndex = in.readInt();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
index 93ae55f..5bd3131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.hash.HashPartition;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Iterator;
+
 /**
- * {@link Iterator} over the build side entries of a {@link HashPartition}
- * 
+ * {@link Iterator} over the build side entries of a {@link HashPartition}.
+ *
  * @param <BT>
  */
 public class HashPartitionIterator<BT, PT> implements MutableObjectIterator<BT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index 7776894..353dcca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -16,9 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -28,14 +35,10 @@ import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-
+/**
+ * {@link AbstractPagedOutputView} used by the {@code BlockingBackChannel} for
+ * transmitting superstep results.
+ */
 public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 	private static final int HEADER_LENGTH = 4;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index f326d89..10962d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
+ *
+ * <p>The records are written to a hash table to allow in-memory point updates.
+ *
+ * <p>Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
  * is for example the case when a solution set update happens directly after a solution set join. If this assumption
  * doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
index 21a9cc8..6a57dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
@@ -24,12 +24,12 @@ import org.apache.flink.util.Collector;
 
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
  * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- * 
+ *
  * @see SolutionSetFastUpdateOutputCollector
  */
 public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T> {
@@ -37,9 +37,9 @@ public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T>
 	private final Collector<T> delegate;
 
 	private final JoinHashMap<T> hashMap;
-	
+
 	private final TypeSerializer<T> serializer;
-	
+
 	public SolutionSetObjectsUpdateOutputCollector(JoinHashMap<T> hashMap) {
 		this(hashMap, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index c39efa5..90041c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
  * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- * 
+ *
  * @see SolutionSetFastUpdateOutputCollector
  */
 public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
index ad1d274..329e768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
@@ -16,19 +16,18 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.IOException;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * A {@link Collector} to update the iteration workset (partial solution for bulk iterations).
- * <p>
- * The records are written to a {@link DataOutputView} to allow in-memory data exchange.
+ *
+ * <p>The records are written to a {@link DataOutputView} to allow in-memory data exchange.
  */
 public class WorksetUpdateOutputCollector<T> implements Collector<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index 047fd7e..bde358c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -21,11 +21,6 @@ package org.apache.flink.runtime.iterative.task;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.Function;
@@ -33,7 +28,9 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -45,6 +42,7 @@ import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCri
 import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
 import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,6 +53,9 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
@@ -64,10 +65,9 @@ import java.util.concurrent.Future;
  * The abstract base class for all tasks able to participate in an iteration.
  */
 public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
-		implements Terminable
-{
+		implements Terminable {
 	private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
-	
+
 	protected LongSumAggregator worksetAggregator;
 
 	protected BlockingBackChannel worksetBackChannel;
@@ -77,14 +77,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	protected boolean isWorksetUpdate;
 
 	protected boolean isSolutionSetUpdate;
-	
 
 	private RuntimeAggregatorRegistry iterationAggregators;
 
 	private String brokerKey;
 
 	private int superstepNum = 1;
-	
+
 	private volatile boolean terminationRequested;
 
 	// --------------------------------------------------------------------------------------------
@@ -105,7 +104,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 				}
 			}
 		}
-		
+
 		TaskConfig config = getLastTasksConfig();
 		isWorksetIteration = config.getIsWorksetIteration();
 		isWorksetUpdate = config.getIsWorksetUpdate();
@@ -134,7 +133,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 		} else {
 			reinstantiateDriver();
 			resetAllInputs();
-			
+
 			// re-read the iterative broadcast variables
 			for (int i : this.iterativeBroadcastInputs) {
 				final String name = getTaskConfig().getBroadcastInputName(i);
@@ -144,7 +143,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 
 		// call the parent to execute the superstep
 		super.run();
-		
+
 		// release the iterative broadcast variables
 		for (int i : this.iterativeBroadcastInputs) {
 			final String name = getTaskConfig().getBroadcastInputName(i);
@@ -244,8 +243,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 					@SuppressWarnings("unchecked")
 					MutableObjectIterator<Object> inIter = (MutableObjectIterator<Object>) this.inputIterators[inputNum];
 					Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
-					while ((o = inIter.next(o)) != null);
-					
+					while ((o = inIter.next(o)) != null) {
+					}
+
 					if (!reader.isFinished()) {
 						// also reset the end-of-superstep state
 						reader.startNextSuperstep();
@@ -253,17 +253,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 				}
 			}
 		}
-		
+
 		for (int inputNum : this.iterativeBroadcastInputs) {
 			MutableReader<?> reader = this.broadcastInputReaders[inputNum];
 
 			if (!reader.isFinished()) {
-				
+
 				// sanity check that the BC input is at the end of the superstep
 				if (!reader.hasReachedEndOfSuperstep()) {
 					throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
 				}
-				
+
 				reader.startNextSuperstep();
 			}
 		}
@@ -291,11 +291,11 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 
 	/**
 	 * Creates a new {@link WorksetUpdateOutputCollector}.
-	 * <p>
-	 * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+	 *
+	 * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
 	 * workset.
-	 * <p>
-	 * If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
+	 *
+	 * <p>If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
 	 * collect(T) of the delegate.
 	 *
 	 * @param delegate null -OR- the delegate on which to call collect() by the newly created collector
@@ -313,13 +313,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 
 	/**
 	 * Creates a new solution set update output collector.
-	 * <p>
-	 * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+	 *
+	 * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
 	 * solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
 	 * {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
 	 * {@link SolutionSetUpdateOutputCollector} is created.
-	 * <p>
-	 * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
+	 *
+	 * <p>If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
 	 * collect(T) of the delegate.
 	 *
 	 * @param delegate null -OR- a delegate collector to be called by the newly created collector
@@ -328,7 +328,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	 */
 	protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
 		Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
-		
+
 		Object ss = solutionSetBroker.get(brokerKey());
 		if (ss instanceof CompactingHashTable) {
 			@SuppressWarnings("unchecked")
@@ -363,7 +363,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
 		public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
-											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulatorMap, MetricGroup metrics) {
+											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, MetricGroup metrics) {
 			super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 575072d..b673ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.operators.Driver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -39,6 +27,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
 import org.apache.flink.runtime.iterative.concurrent.Broker;
@@ -54,12 +46,20 @@ import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * The head is responsible for coordinating an iteration and can run a
  * {@link Driver} inside. It will read
@@ -71,11 +71,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
  * iteration is done, the head
  * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
+ *
+ * <p>Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
  * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
  * connects to the synchronization task.
- * 
+ *
  * @param <X>
  *        The type of the bulk partial solution / solution set and the final output.
  * @param <Y>
@@ -131,8 +131,8 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 	}
 
 	/**
-	 * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
-	 * hands it to the iteration tail via a {@link Broker} singleton
+	 * The iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
+	 * hands it to the iteration tail via a {@link Broker} singleton.
 	 **/
 	private BlockingBackChannel initBackChannel() throws Exception {
 
@@ -154,7 +154,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 
 		return backChannel;
 	}
-	
+
 	private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
 		// get some memory
 		double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
@@ -162,7 +162,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 
 		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
 		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
-	
+
 		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
 		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
 
@@ -194,27 +194,27 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			}
 		}
 	}
-	
+
 	private <BT> JoinHashMap<BT> initJoinHashMap() {
 		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
 				(getUserCodeClassLoader());
 		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
 				(getUserCodeClassLoader());
-	
+
 		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
 		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-		
+
 		return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
 	}
-	
+
 	private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
 		solutionSet.open();
 		solutionSet.buildTableWithUniqueKey(solutionSetInput);
 	}
-	
+
 	private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
 		TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
-		
+
 		X next;
 		while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
 			solutionSet.insertOrReplace(next);
@@ -232,12 +232,12 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 	public void run() throws Exception {
 		final String brokerKey = brokerKey();
 		final int workerIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
-		
+
 		final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
 
 		CompactingHashTable<X> solutionSet = null; // if workset iteration
 		JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
-		
+
 		boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
 		boolean isWorksetIteration = config.getIsWorksetIteration();
 
@@ -245,7 +245,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			/* used for receiving the current iteration result from iteration tail */
 			SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
 			SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-			
+
 			BlockingBackChannel backChannel = initBackChannel();
 			SuperstepBarrier barrier = initSuperstepBarrier();
 			SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
@@ -262,7 +262,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 				// setup the index for the solution set
 				@SuppressWarnings("unchecked")
 				MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-				
+
 				// read the initial solution set
 				if (objectSolutionSet) {
 					solutionSetObjectMap = initJoinHashMap();
@@ -284,7 +284,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 				@SuppressWarnings("unchecked")
 				TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
 				solutionTypeSerializer = solSer;
-				
+
 				// = termination Criterion tail
 				if (waitForSolutionSetUpdate) {
 					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
@@ -352,7 +352,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 					String[] globalAggregateNames = barrier.getAggregatorNames();
 					Value[] globalAggregates = barrier.getAggregates();
 					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
-					
+
 					nextStepKickoff.triggerNextSuperstep();
 				}
 			}
@@ -398,7 +398,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			out.collect(record);
 		}
 	}
-	
+
 	private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
 		final MutableObjectIterator<X> results = hashTable.getEntryIterator();
 		final Collector<X> output = this.finalOutputCollector;
@@ -408,7 +408,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			output.collect(record);
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
 		final Collector<X> output = this.finalOutputCollector;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 16a7008..c5fd133 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,11 +35,11 @@ import java.io.IOException;
 
 /**
  * An intermediate iteration task, which runs a {@link org.apache.flink.runtime.operators.Driver} inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
+ *
+ * <p>It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
  * intermediate tasks can also update the iteration state, either the workset or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
  * a {@link BlockingBackChannel} for the workset -XOR- a HashTable for the solution set. In this case
  * this task must be scheduled on the same instance as the head.
  */
@@ -64,7 +65,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 			if (isSolutionSetUpdate) {
 				throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
 			}
-			
+
 			Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
 
 			// we need the WorksetUpdateOutputCollector separately to count the collected elements
@@ -80,7 +81,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 
 	@Override
 	public void run() throws Exception {
-		
+
 		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
 
 		while (this.running && !terminationRequested()) {
@@ -98,19 +99,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
 				worksetAggregator.aggregate(numCollected);
 			}
-			
+
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
 			}
-			
+
 			// let the successors know that the end of this superstep data is reached
 			sendEndOfSuperstep();
-			
+
 			if (isWorksetUpdate) {
 				// notify iteration head if responsible for workset update
 				worksetBackChannel.notifyOfEndOfSuperstep();
 			}
-			
+
 			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
 
 			if (terminated) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 11a8cfa..6a38fcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -16,31 +16,31 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.types.IntValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * The task responsible for synchronizing all iteration heads, implemented as an output task. This task
  * will never see any data.
@@ -52,13 +52,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
 
 	private MutableRecordReader<IntValue> headEventReader;
-	
+
 	private SyncEventHandler eventHandler;
 
 	private ConvergenceCriterion<Value> convergenceCriterion;
 
 	private ConvergenceCriterion<Value> implicitConvergenceCriterion;
-	
+
 	private Map<String, Aggregator<?>> aggregators;
 
 	private String convergenceAggregatorName;
@@ -66,13 +66,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	private String implicitConvergenceAggregatorName;
 
 	private int currentIteration = 1;
-	
+
 	private int maxNumberOfIterations;
 
 	private final AtomicBoolean terminated = new AtomicBoolean(false);
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void invoke() throws Exception {
 		this.headEventReader = new MutableRecordReader<>(
@@ -80,13 +80,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 				getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
 		TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());
-		
+
 		// store all aggregators
 		this.aggregators = new HashMap<>();
 		for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators(getUserCodeClassLoader())) {
 			aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
 		}
-		
+
 		// store the aggregator convergence criterion
 		if (taskConfig.usesConvergenceCriterion()) {
 			convergenceCriterion = taskConfig.getConvergenceCriterion(getUserCodeClassLoader());
@@ -100,9 +100,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 			implicitConvergenceAggregatorName = taskConfig.getImplicitConvergenceCriterionAggregatorName();
 			Preconditions.checkNotNull(implicitConvergenceAggregatorName);
 		}
-		
+
 		maxNumberOfIterations = taskConfig.getNumberOfIterations();
-		
+
 		// set up the event handler
 		int numEventsTillEndOfSuperstep = taskConfig.getNumberOfEventsUntilInterruptInIterativeGate(0);
 		eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, aggregators,
@@ -110,7 +110,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 		headEventReader.registerTaskEventListener(eventHandler, WorkerDoneEvent.class);
 
 		IntValue dummy = new IntValue();
-		
+
 		while (!terminationRequested()) {
 
 			if (log.isInfoEnabled()) {
@@ -140,7 +140,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 
 				AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent(aggregators);
 				sendToAllWorkers(allWorkersDoneEvent);
-				
+
 				// reset all aggregators
 				for (Aggregator<?> agg : aggregators.values()) {
 					agg.reset();
@@ -165,7 +165,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 			if (aggregator == null) {
 				throw new RuntimeException("Error: Aggregator for convergence criterion was null.");
 			}
-			
+
 			Value aggregate = aggregator.getAggregate();
 
 			if (convergenceCriterion.isConverged(currentIteration, aggregate)) {
@@ -194,14 +194,14 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 				return true;
 			}
 		}
-		
+
 		return false;
 	}
 
 	private void readHeadEventChannel(IntValue rec) throws IOException {
 		// reset the handler
 		eventHandler.resetEndOfSuperstep();
-		
+
 		// read (and thereby process all events in the handler's event handling functions)
 		try {
 			if (this.headEventReader.next(rec)) {
@@ -222,9 +222,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	private String formatLogString(String message) {
 		return BatchTask.constructLogString(message, getEnvironment().getTaskInfo().getTaskName(), this);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean terminationRequested() {
 		return terminated.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
index 9e0b560..3ec3a8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
@@ -28,15 +26,18 @@ import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.util.Collector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
  * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
  * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
  * and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
+ *
+ * <p>If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
  */
 public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
 
@@ -45,7 +46,6 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 	private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
 
 	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-	
 
 	@Override
 	protected void initialize() throws Exception {
@@ -80,6 +80,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 				outputCollector = new Collector<OT>() {
 					@Override
 					public void collect(OT record) {}
+
 					@Override
 					public void close() {}
 				};
@@ -95,9 +96,9 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 
 	@Override
 	public void run() throws Exception {
-		
+
 		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-		
+
 		while (this.running && !terminationRequested()) {
 
 			if (log.isInfoEnabled()) {
@@ -119,7 +120,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
 			}
-			
+
 			if (isWorksetUpdate) {
 				// notify iteration head if responsible for workset update
 				worksetBackChannel.notifyOfEndOfSuperstep();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
index 7beb4c7..c4a30c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
@@ -18,56 +18,56 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.types.Value;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  *
  */
 public class RuntimeAggregatorRegistry {
-	
+
 	private final Map<String, Aggregator<?>> aggregators;
-	
+
 	private final Map<String, Value> previousGlobalAggregate;
-	
+
 	public RuntimeAggregatorRegistry(Collection<AggregatorWithName<?>> aggs) {
 		this.aggregators = new HashMap<String, Aggregator<?>>();
 		this.previousGlobalAggregate = new HashMap<String, Value>();
-		
+
 		for (AggregatorWithName<?> agg : aggs) {
 			this.aggregators.put(agg.getName(), agg.getAggregator());
 		}
 	}
-	
+
 	public Value getPreviousGlobalAggregate(String name) {
 		return this.previousGlobalAggregate.get(name);
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public <T extends Aggregator<?>> T getAggregator(String name) {
 		return (T) this.aggregators.get(name);
 	}
-	
+
 	public Map<String, Aggregator<?>> getAllAggregators() {
 		return this.aggregators;
 	}
-	
+
 	public void updateGlobalAggregatesAndReset(String[] names, Value[] aggregates) {
 		if (names == null || aggregates == null || names.length != aggregates.length) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		// add global aggregates
-		for (int i = 0 ; i < names.length; i++) {
+		for (int i = 0; i < names.length; i++) {
 			this.previousGlobalAggregate.put(names[i], aggregates[i]);
 		}
-		
+
 		// reset all aggregators
 		for (Aggregator<?> agg : this.aggregators.values()) {
 			agg.reset();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 71c15b1..45843ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -18,27 +18,30 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Map;
+
+/**
+ * Listener for {@link WorkerDoneEvent} which also aggregates all aggregators
+ * from iteration tasks and signals the end of the superstep.
+ */
 public class SyncEventHandler implements EventListener<TaskEvent> {
-	
+
 	private final ClassLoader userCodeClassLoader;
-	
+
 	private final Map<String, Aggregator<?>> aggregators;
 
 	private final int numberOfEventsUntilEndOfSuperstep;
 
 	private int workerDoneEventCounter;
-	
-	private boolean endOfSuperstep;
 
+	private boolean endOfSuperstep;
 
 	public SyncEventHandler(int numberOfEventsUntilEndOfSuperstep, Map<String, Aggregator<?>> aggregators, ClassLoader userCodeClassLoader) {
 		Preconditions.checkArgument(numberOfEventsUntilEndOfSuperstep > 0);
@@ -60,7 +63,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
 		if (this.endOfSuperstep) {
 			throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
 		}
-		
+
 		workerDoneEventCounter++;
 
 		String[] aggNames = workerDoneEvent.getAggregatorNames();
@@ -69,7 +72,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
 		if (aggNames.length != aggregates.length) {
 			throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
 		}
-		
+
 		for (int i = 0; i < aggNames.length; i++) {
 			@SuppressWarnings("unchecked")
 			Aggregator<Value> aggregator = (Aggregator<Value>) this.aggregators.get(aggNames[i]);
@@ -81,11 +84,11 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
 			Thread.currentThread().interrupt();
 		}
 	}
-	
+
 	public boolean isEndOfSuperstep() {
 		return this.endOfSuperstep;
 	}
-	
+
 	public void resetEndOfSuperstep() {
 		this.endOfSuperstep = false;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
index d6f7544..9d952ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
 /**
- * Models the functionality that the termination of an iterative task can be requested from outside
+ * Models the functionality that the termination of an iterative task can be requested from outside.
  */
 public interface Terminable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
index d22a23b..c9015e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
@@ -16,25 +16,26 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link BlockingBackChannel}.
+ */
 public class BlockingBackChannelTest {
 
 	private static final int NUM_ITERATIONS = 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
index 3f2c243..e12cb32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
+import org.apache.flink.util.Preconditions;
+
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
-import org.apache.flink.util.Preconditions;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -35,6 +34,9 @@ import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link Broker}.
+ */
 public class BrokerTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
index 798d9f5..82ddac7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 class StringPair {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index 2f26670..1187adf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -16,19 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
+
 import org.junit.Test;
 
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link SuperstepBarrier}.
+ */
 public class SuperstepBarrierTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
index 8c8deb2..90555d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
@@ -21,30 +21,33 @@ package org.apache.flink.runtime.iterative.concurrent;
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Tests for {@link SuperstepKickoffLatch}.
+ */
 public class SuperstepKickoffLatchTest {
 
 	@Test
 	public void testWaitFromOne() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
-			
+
 			Waiter w = new Waiter(latch, 2);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			Thread.sleep(100);
-			
+
 			latch.triggerNextSuperstep();
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				throw w.getError();
 			}
@@ -54,28 +57,28 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitAlreadyFulfilled() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
-			
+
 			Waiter w = new Waiter(latch, 2);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			Thread.sleep(100);
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				throw w.getError();
 			}
@@ -85,14 +88,14 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitIncorrect() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
 			latch.triggerNextSuperstep();
-			
+
 			try {
 				latch.awaitStartOfSuperstepOrTermination(2);
 				Assert.fail("should throw exception");
@@ -106,29 +109,29 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitIncorrectAsync() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
 			latch.triggerNextSuperstep();
-			
+
 			Waiter w = new Waiter(latch, 2);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			Thread.sleep(100);
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				if (!(w.getError() instanceof IllegalStateException)) {
 					throw new Exception("wrong exception type " + w.getError());
@@ -142,29 +145,29 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitForTermination() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
 			latch.triggerNextSuperstep();
-			
+
 			Waiter w = new Waiter(latch, 4);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			latch.signalTermination();
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				throw w.getError();
 			}
@@ -174,16 +177,15 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	private static class Waiter implements Runnable {
 
 		private final SuperstepKickoffLatch latch;
-		
+
 		private final int waitFor;
-		
+
 		private volatile Throwable error;
-		
-		
+
 		public Waiter(SuperstepKickoffLatch latch, int waitFor) {
 			this.latch = latch;
 			this.waitFor = waitFor;
@@ -198,37 +200,37 @@ public class SuperstepKickoffLatchTest {
 				this.error = t;
 			}
 		}
-		
+
 		public Throwable getError() {
 			return error;
 		}
 	}
-	
+
 	private static class WatchDog extends Thread {
-		
+
 		private final Thread toWatch;
-		
+
 		private final long timeOut;
-		
+
 		private volatile Throwable failed;
-		
+
 		public WatchDog(Thread toWatch, long timeout) {
 			setDaemon(true);
 			setName("Watchdog");
 			this.toWatch = toWatch;
 			this.timeOut = timeout;
 		}
-		
+
 		@SuppressWarnings("deprecation")
 		@Override
 		public void run() {
 			try {
 				toWatch.join(timeOut);
-				
+
 				if (toWatch.isAlive()) {
 					this.failed = new Exception("timed out");
 					toWatch.interrupt();
-					
+
 					toWatch.join(2000);
 					if (toWatch.isAlive()) {
 						toWatch.stop();
@@ -239,7 +241,7 @@ public class SuperstepKickoffLatchTest {
 				failed = t;
 			}
 		}
-		
+
 		public Throwable getError() {
 			return failed;
 		}