You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/04 12:35:24 UTC

[1/7] flink git commit: [FLINK-6880] [runtime] Activate checkstyle for runtime/iterative

Repository: flink
Updated Branches:
  refs/heads/master 41806ba68 -> 85853edab


http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
index 10e0b11..341cff0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/event/EventWithAggregatorsTest.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -35,54 +28,63 @@ import org.apache.flink.types.Value;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
+/**
+ * Tests for {@link IterationEventWithAggregators}.
+ */
 public class EventWithAggregatorsTest {
-	
+
 	private ClassLoader cl = ClassLoader.getSystemClassLoader();
-	
+
 	@Test
 	public void testSerializationOfEmptyEvent() {
 		AllWorkersDoneEvent e = new AllWorkersDoneEvent();
 		IterationEventWithAggregators deserialized = pipeThroughSerialization(e);
-		
+
 		Assert.assertEquals(0, deserialized.getAggregatorNames().length);
 		Assert.assertEquals(0, deserialized.getAggregates(cl).length);
 	}
-	
+
 	@Test
 	public void testSerializationOfEventWithAggregateValues() {
 		StringValue stringValue = new StringValue("test string");
 		LongValue longValue = new LongValue(68743254);
-		
+
 		String stringValueName = "stringValue";
 		String longValueName = "longValue";
-		
+
 		Aggregator<StringValue> stringAgg = new TestAggregator<StringValue>(stringValue);
 		Aggregator<LongValue> longAgg = new TestAggregator<LongValue>(longValue);
-		
+
 		Map<String, Aggregator<?>> aggMap = new HashMap<String,  Aggregator<?>>();
 		aggMap.put(stringValueName, stringAgg);
 		aggMap.put(longValueName, longAgg);
-		
+
 		Set<String> allNames = new HashSet<String>();
 		allNames.add(stringValueName);
 		allNames.add(longValueName);
-		
+
 		Set<Value> allVals = new HashSet<Value>();
 		allVals.add(stringValue);
 		allVals.add(longValue);
-		
+
 		// run the serialization
 		AllWorkersDoneEvent e = new AllWorkersDoneEvent(aggMap);
 		IterationEventWithAggregators deserialized = pipeThroughSerialization(e);
-		
+
 		// verify the result
 		String[] names = deserialized.getAggregatorNames();
 		Value[] aggregates = deserialized.getAggregates(cl);
-		
+
 		Assert.assertEquals(allNames.size(), names.length);
 		Assert.assertEquals(allVals.size(), aggregates.length);
-		
+
 		// check that all the correct names and values are returned
 		for (String s : names) {
 			allNames.remove(s);
@@ -90,24 +92,24 @@ public class EventWithAggregatorsTest {
 		for (Value v : aggregates) {
 			allVals.remove(v);
 		}
-		
+
 		Assert.assertTrue(allNames.isEmpty());
 		Assert.assertTrue(allVals.isEmpty());
 	}
-	
+
 	private IterationEventWithAggregators pipeThroughSerialization(IterationEventWithAggregators event) {
 		try {
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
 			event.write(new DataOutputViewStreamWrapper(baos));
-			
+
 			byte[] data = baos.toByteArray();
 			baos.close();
-			
+
 			DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(new ByteArrayInputStream(data));
 			IterationEventWithAggregators newEvent = event.getClass().newInstance();
 			newEvent.read(in);
 			in.close();
-			
+
 			return newEvent;
 		}
 		catch (Exception e) {
@@ -117,14 +119,13 @@ public class EventWithAggregatorsTest {
 			return null;
 		}
 	}
-	
+
 	private static class TestAggregator<T extends Value> implements Aggregator<T> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private final T val;
-		
-		
+
 		public TestAggregator(T val) {
 			this.val = val;
 		}


[4/7] flink git commit: [FLINK-6877] [runtime] Activate checkstyle for runtime/security

Posted by ch...@apache.org.
[FLINK-6877] [runtime] Activate checkstyle for runtime/security

This closes #4095.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31ad8020
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31ad8020
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31ad8020

Branch: refs/heads/master
Commit: 31ad80206f1fee234848f1288d493ee9428d309e
Parents: 834c527
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 09:58:24 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:25 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../runtime/security/DynamicConfiguration.java  | 18 +++++------
 .../runtime/security/HadoopSecurityContext.java |  1 +
 .../flink/runtime/security/KerberosUtils.java   | 15 ++++-----
 .../runtime/security/NoOpSecurityContext.java   |  1 +
 .../flink/runtime/security/SecurityContext.java |  1 +
 .../flink/runtime/security/SecurityUtils.java   | 32 +++++++++++---------
 .../runtime/security/modules/HadoopModule.java  |  9 ++++--
 .../runtime/security/modules/JaasModule.java    | 15 +++++----
 .../security/modules/SecurityModule.java        |  1 +
 .../security/modules/ZooKeeperModule.java       |  7 +++--
 .../runtime/security/KerberosUtilsTest.java     |  2 --
 .../runtime/security/SecurityUtilsTest.java     |  2 ++
 13 files changed, 59 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index df905dc..80f95a5 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -455,7 +455,6 @@ under the License.
 						**/runtime/registration/**,
 						**/runtime/resourcemanager/**,
 						**/runtime/rpc/**,
-						**/runtime/security/**,
 						**/runtime/state/**,
 						**/runtime/taskexecutor/**,
 						**/runtime/taskmanager/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
index 6af4f23..da1c9c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/DynamicConfiguration.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security;
 
 import org.slf4j.Logger;
@@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,7 +33,7 @@ import java.util.Map;
 /**
  * A dynamic JAAS configuration.
  *
- * Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
+ * <p>Makes it possible to define Application Configuration Entries (ACEs) at runtime, building upon
  * an (optional) underlying configuration.   Entries from the underlying configuration take
  * precedence over dynamic entries.
  */
@@ -41,7 +43,7 @@ public class DynamicConfiguration extends Configuration {
 
 	private final Configuration delegate;
 
-	private final Map<String,AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
+	private final Map<String, AppConfigurationEntry[]> dynamicEntries = new HashMap<>();
 
 	/**
 	 * Create a dynamic configuration.
@@ -57,7 +59,7 @@ public class DynamicConfiguration extends Configuration {
 	public void addAppConfigurationEntry(String name, AppConfigurationEntry... entry) {
 		final AppConfigurationEntry[] existing = dynamicEntries.get(name);
 		final AppConfigurationEntry[] updated;
-		if(existing == null) {
+		if (existing == null) {
 			updated = Arrays.copyOf(entry, entry.length);
 		}
 		else {
@@ -70,8 +72,6 @@ public class DynamicConfiguration extends Configuration {
 	 * Retrieve the AppConfigurationEntries for the specified <i>name</i>
 	 * from this Configuration.
 	 *
-	 * <p>
-	 *
 	 * @param name the name used to index the Configuration.
 	 *
 	 * @return an array of AppConfigurationEntries for the specified <i>name</i>
@@ -81,12 +81,12 @@ public class DynamicConfiguration extends Configuration {
 	@Override
 	public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
 		AppConfigurationEntry[] entry = null;
-		if(delegate != null) {
+		if (delegate != null) {
 			entry = delegate.getAppConfigurationEntry(name);
 		}
 		final AppConfigurationEntry[] existing = dynamicEntries.get(name);
-		if(existing != null) {
-			if(entry != null) {
+		if (existing != null) {
+			if (entry != null) {
 				entry = merge(entry, existing);
 			}
 			else {
@@ -104,7 +104,7 @@ public class DynamicConfiguration extends Configuration {
 
 	@Override
 	public void refresh() {
-		if(delegate != null) {
+		if (delegate != null) {
 			delegate.refresh();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
index c70f00b..7b0bb06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/HadoopSecurityContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.util.Preconditions;
+
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.security.PrivilegedExceptionAction;

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
index 7ef9187..5662d29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/KerberosUtils.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.annotation.Internal;
+
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.AppConfigurationEntry;
+
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,10 +33,9 @@ import java.util.Map;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- *
  * Provides vendor-specific Kerberos {@link AppConfigurationEntry} instances.
  *
- * The implementation is inspired from Hadoop UGI class.
+ * <p>The implementation is inspired from Hadoop UGI class.
  */
 @Internal
 public class KerberosUtils {
@@ -55,11 +56,11 @@ public class KerberosUtils {
 
 		IBM_JAVA = JAVA_VENDOR_NAME.contains("IBM");
 
-		if(LOG.isDebugEnabled()) {
+		if (LOG.isDebugEnabled()) {
 			debugOptions.put("debug", "true");
 		}
 
-		if(IBM_JAVA) {
+		if (IBM_JAVA) {
 			kerberosCacheOptions.put("useDefaultCcache", "true");
 		} else {
 			kerberosCacheOptions.put("doNotPrompt", "true");
@@ -67,8 +68,8 @@ public class KerberosUtils {
 		}
 
 		String ticketCache = System.getenv("KRB5CCNAME");
-		if(ticketCache != null) {
-			if(IBM_JAVA) {
+		if (ticketCache != null) {
+			if (IBM_JAVA) {
 				System.setProperty("KRB5CCNAME", ticketCache);
 			} else {
 				kerberosCacheOptions.put("ticketCache", ticketCache);
@@ -96,7 +97,7 @@ public class KerberosUtils {
 
 		Map<String, String> keytabKerberosOptions = new HashMap<>();
 
-		if(IBM_JAVA) {
+		if (IBM_JAVA) {
 			keytabKerberosOptions.put("useKeytab", prependFileUri(keytab));
 			keytabKerberosOptions.put("credsType", "both");
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
index 4574db5..02f97da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/NoOpSecurityContext.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security;
 
 import java.util.concurrent.Callable;

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
index 02892d3..63ca728 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security;
 
 import java.util.concurrent.Callable;

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index b874009..df49822 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.security;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.configuration.Configuration;
@@ -29,6 +27,9 @@ import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.security.modules.JaasModule;
 import org.apache.flink.runtime.security.modules.SecurityModule;
 import org.apache.flink.runtime.security.modules.ZooKeeperModule;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,9 +42,8 @@ import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/*
+/**
  * Utils for configuring security. The following security subsystems are supported:
- *
  * 1. Java Authentication and Authorization Service (JAAS)
  * 2. Hadoop's User Group Information (UGI)
  * 3. ZooKeeper's process-wide security settings.
@@ -56,7 +56,9 @@ public class SecurityUtils {
 
 	private static List<SecurityModule> installedModules = null;
 
-	public static SecurityContext getInstalledContext() { return installedContext; }
+	public static SecurityContext getInstalledContext() {
+		return installedContext;
+	}
 
 	@VisibleForTesting
 	static List<SecurityModule> getInstalledModules() {
@@ -66,7 +68,7 @@ public class SecurityUtils {
 	/**
 	 * Installs a process-wide security configuration.
 	 *
-	 * Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
+	 * <p>Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
 	 */
 	public static void install(SecurityConfiguration config) throws Exception {
 
@@ -79,7 +81,7 @@ public class SecurityUtils {
 				modules.add(module);
 			}
 		}
-		catch(Exception ex) {
+		catch (Exception ex) {
 			throw new Exception("unable to establish the security context", ex);
 		}
 		installedModules = modules;
@@ -94,14 +96,14 @@ public class SecurityUtils {
 	}
 
 	static void uninstall() {
-		if(installedModules != null) {
+		if (installedModules != null) {
 			for (SecurityModule module : Lists.reverse(installedModules)) {
 				try {
 					module.uninstall();
 				}
-				catch(UnsupportedOperationException ignored) {
+				catch (UnsupportedOperationException ignored) {
 				}
-				catch(SecurityModule.SecurityInstallException e) {
+				catch (SecurityModule.SecurityInstallException e) {
 					LOG.warn("unable to uninstall a security module", e);
 				}
 			}
@@ -114,7 +116,7 @@ public class SecurityUtils {
 	/**
 	 * The global security configuration.
 	 *
-	 * See {@link SecurityOptions} for corresponding configuration options.
+	 * <p>See {@link SecurityOptions} for corresponding configuration options.
 	 */
 	public static class SecurityConfiguration {
 
@@ -215,22 +217,22 @@ public class SecurityUtils {
 		}
 
 		private void validate() {
-			if(!StringUtils.isBlank(keytab)) {
+			if (!StringUtils.isBlank(keytab)) {
 				// principal is required
-				if(StringUtils.isBlank(principal)) {
+				if (StringUtils.isBlank(principal)) {
 					throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab requires a principal.");
 				}
 
 				// check the keytab is readable
 				File keytabFile = new File(keytab);
-				if(!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
+				if (!keytabFile.exists() || !keytabFile.isFile() || !keytabFile.canRead()) {
 					throw new IllegalConfigurationException("Kerberos login configuration is invalid; keytab is unreadable");
 				}
 			}
 		}
 
 		private static List<String> parseList(String value) {
-			if(value == null || value.isEmpty()) {
+			if (value == null || value.isEmpty()) {
 				return Collections.emptyList();
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 5c62272..05be314 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security.modules;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.runtime.security.SecurityUtils;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -29,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.Subject;
+
 import java.io.File;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -75,11 +78,11 @@ public class HadoopModule implements SecurityModule {
 						// and does not fallback to using Kerberos tickets
 						Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
 						Credentials credentials = new Credentials();
-						final Text HDFS_DELEGATION_TOKEN_KIND = new Text("HDFS_DELEGATION_TOKEN");
+						final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
 						Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
 						//If UGI use keytab for login, do not load HDFS delegation token.
 						for (Token<? extends TokenIdentifier> token : usrTok) {
-							if (!token.getKind().equals(HDFS_DELEGATION_TOKEN_KIND)) {
+							if (!token.getKind().equals(hdfsDelegationTokenKind)) {
 								final Text id = new Text(token.getIdentifier());
 								credentials.addToken(id, token);
 							}

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
index f8b9bdf..91411a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/JaasModule.java
@@ -15,16 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security.modules;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.security.DynamicConfiguration;
 import org.apache.flink.runtime.security.KerberosUtils;
 import org.apache.flink.runtime.security.SecurityUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.AppConfigurationEntry;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,13 +37,13 @@ import java.nio.file.StandardCopyOption;
 
 /**
  * Responsible for installing a process-wide JAAS configuration.
- * <p>
- * The installed configuration combines login modules based on:
+ *
+ * <p>The installed configuration combines login modules based on:
  * - the user-supplied JAAS configuration file, if any
  * - a Kerberos keytab, if configured
  * - any cached Kerberos credentials from the current environment
- * <p>
- * The module also installs a default JAAS config file (if necessary) for
+ *
+ * <p>The module also installs a default JAAS config file (if necessary) for
  * compatibility with ZK and Kafka.  Note that the JRE actually draws on numerous file locations.
  * See: https://docs.oracle.com/javase/7/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html
  * See: https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
@@ -78,7 +81,7 @@ public class JaasModule implements SecurityModule {
 
 		// wire up the configured JAAS login contexts to use the krb5 entries
 		AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
-		if(krb5Entries != null) {
+		if (krb5Entries != null) {
 			for (String app : securityConfig.getLoginContextNames()) {
 				currentConfig.addAppConfigurationEntry(app, krb5Entries);
 			}
@@ -89,7 +92,7 @@ public class JaasModule implements SecurityModule {
 
 	@Override
 	public void uninstall() throws SecurityInstallException {
-		if(priorConfigFile != null) {
+		if (priorConfigFile != null) {
 			System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, priorConfigFile);
 		} else {
 			System.clearProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
index fbe1db9..1a335df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/SecurityModule.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security.modules;
 
 import org.apache.flink.runtime.security.SecurityUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
index 216bdde..af2c1f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security.modules;
 
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -66,17 +67,17 @@ public class ZooKeeperModule implements SecurityModule {
 
 	@Override
 	public void uninstall() throws SecurityInstallException {
-		if(priorSaslEnable != null) {
+		if (priorSaslEnable != null) {
 			System.setProperty(ZK_ENABLE_CLIENT_SASL, priorSaslEnable);
 		} else {
 			System.clearProperty(ZK_ENABLE_CLIENT_SASL);
 		}
-		if(priorServiceName != null) {
+		if (priorServiceName != null) {
 			System.setProperty(ZK_SASL_CLIENT_USERNAME, priorServiceName);
 		} else {
 			System.clearProperty(ZK_SASL_CLIENT_USERNAME);
 		}
-		if(priorLoginContextName != null) {
+		if (priorLoginContextName != null) {
 			System.setProperty(ZK_LOGIN_CONTEXT_NAME, priorLoginContextName);
 		} else {
 			System.clearProperty(ZK_LOGIN_CONTEXT_NAME);

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
index 4c899e8..58817db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/KerberosUtilsTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.runtime.security;
 
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
 import org.junit.Test;
 
 import javax.security.auth.login.AppConfigurationEntry;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/31ad8020/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
index 3e3808b..c179f6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityUtilsTest.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.security;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.security.modules.SecurityModule;
+
 import org.junit.AfterClass;
 import org.junit.Test;
 


[2/7] flink git commit: [FLINK-6880] [runtime] Activate checkstyle for runtime/iterative

Posted by ch...@apache.org.
[FLINK-6880] [runtime] Activate checkstyle for runtime/iterative

This closes #4098.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/834c5277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/834c5277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/834c5277

Branch: refs/heads/master
Commit: 834c527783346f777bf5def4c61f1791b2a89473
Parents: 343a804
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 12:53:41 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 11:37:12 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../concurrent/BlockingBackChannel.java         | 13 ++-
 .../concurrent/BlockingBackChannelBroker.java   |  8 +-
 .../runtime/iterative/concurrent/Broker.java    | 17 ++--
 .../concurrent/IterationAggregatorBroker.java   | 11 ++-
 .../iterative/concurrent/SolutionSetBroker.java |  5 +-
 .../SolutionSetUpdateBarrierBroker.java         |  5 +-
 .../iterative/concurrent/SuperstepBarrier.java  | 22 +++--
 .../concurrent/SuperstepKickoffLatch.java       | 15 ++--
 .../concurrent/SuperstepKickoffLatchBroker.java |  7 +-
 .../WorksetEmptyConvergenceCriterion.java       | 10 +--
 .../iterative/event/AllWorkersDoneEvent.java    | 11 ++-
 .../event/IterationEventWithAggregators.java    | 49 ++++++------
 .../iterative/event/TerminationEvent.java       |  9 +--
 .../iterative/event/WorkerDoneEvent.java        | 23 +++---
 .../iterative/io/HashPartitionIterator.java     | 12 +--
 .../iterative/io/SerializedUpdateBuffer.java    | 21 ++---
 .../SolutionSetFastUpdateOutputCollector.java   | 12 +--
 ...SolutionSetObjectsUpdateOutputCollector.java | 14 ++--
 .../io/SolutionSetUpdateOutputCollector.java    | 14 ++--
 .../io/WorksetUpdateOutputCollector.java        |  9 +--
 .../iterative/task/AbstractIterativeTask.java   | 56 ++++++-------
 .../iterative/task/IterationHeadTask.java       | 66 +++++++--------
 .../task/IterationIntermediateTask.java         | 23 +++---
 .../task/IterationSynchronizationSinkTask.java  | 52 ++++++------
 .../iterative/task/IterationTailTask.java       | 21 ++---
 .../task/RuntimeAggregatorRegistry.java         | 30 +++----
 .../iterative/task/SyncEventHandler.java        | 25 +++---
 .../runtime/iterative/task/Terminable.java      |  3 +-
 .../concurrent/BlockingBackChannelTest.java     | 21 ++---
 .../iterative/concurrent/BrokerTest.java        |  8 +-
 .../iterative/concurrent/StringPair.java        |  1 -
 .../concurrent/SuperstepBarrierTest.java        | 15 ++--
 .../concurrent/SuperstepKickoffLatchTest.java   | 84 ++++++++++----------
 .../event/EventWithAggregatorsTest.java         | 59 +++++++-------
 35 files changed, 383 insertions(+), 369 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 654227a..df905dc 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -441,7 +441,6 @@ under the License.
 						**/runtime/highavailability/**,
 						**/runtime/instance/**,
 						**/runtime/io/**,
-						**/runtime/iterative/**,
 						**/runtime/jobgraph/**,
 						**/runtime/jobmanager/**,
 						**/runtime/jobmaster/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
index 067bbfe..1373654 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannel.java
@@ -16,27 +16,26 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
 
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
 /**
  * A concurrent datastructure that establishes a backchannel buffer between an iteration head
  * and an iteration tail.
  */
 public class BlockingBackChannel {
 
-	/** buffer to send back the superstep results */
+	/** Buffer to send back the superstep results. */
 	private final SerializedUpdateBuffer buffer;
 
-	/** a one element queue used for blocking hand over of the buffer */
+	/** A one element queue used for blocking hand over of the buffer. */
 	private final BlockingQueue<SerializedUpdateBuffer> queue;
 
 	public BlockingBackChannel(SerializedUpdateBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
index daa3ec3..120ecf1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelBroker.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 /**
- * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails
+ * Singleton class for the threadsafe handover of {@link BlockingBackChannel}s from iteration heads to iteration tails.
  */
 public class BlockingBackChannelBroker extends Broker<BlockingBackChannel> {
 
-	/**
-	 * Singleton instance
-	 */
 	private static final BlockingBackChannelBroker INSTANCE = new BlockingBackChannelBroker();
 
 	private BlockingBackChannelBroker() {}
 
 	/**
-	 * retrieve singleton instance
+	 * Retrieve singleton instance.
 	 */
 	public static Broker<BlockingBackChannel> instance() {
 		return INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
index 444d21f..6816af4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/Broker.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 import java.util.concurrent.ArrayBlockingQueue;
@@ -25,14 +24,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * A concurrent data structure that allows the hand-over of an object between a pair of threads
+ * A concurrent data structure that allows the hand-over of an object between a pair of threads.
  */
 public class Broker<V> {
 
 	private final ConcurrentMap<String, BlockingQueue<V>> mediations = new ConcurrentHashMap<String, BlockingQueue<V>>();
 
 	/**
-	 * hand in the object to share
+	 * Hand in the object to share.
 	 */
 	public void handIn(String key, V obj) {
 		if (!retrieveSharedQueue(key).offer(obj)) {
@@ -40,7 +39,7 @@ public class Broker<V> {
 		}
 	}
 
-	/** blocking retrieval and removal of the object to share */
+	/** Blocking retrieval and removal of the object to share. */
 	public V getAndRemove(String key) {
 		try {
 			V objToShare = retrieveSharedQueue(key).take();
@@ -50,13 +49,13 @@ public class Broker<V> {
 			throw new RuntimeException(e);
 		}
 	}
-	
-	/** blocking retrieval and removal of the object to share */
+
+	/** Blocking retrieval and removal of the object to share. */
 	public void remove(String key) {
 		mediations.remove(key);
 	}
-	
-	/** blocking retrieval and removal of the object to share */
+
+	/** Blocking retrieval and removal of the object to share. */
 	public V get(String key) {
 		try {
 			BlockingQueue<V> queue = retrieveSharedQueue(key);
@@ -71,7 +70,7 @@ public class Broker<V> {
 	}
 
 	/**
-	 * thread-safe call to get a shared {@link BlockingQueue}
+	 * Thread-safe call to get a shared {@link BlockingQueue}.
 	 */
 	private BlockingQueue<V> retrieveSharedQueue(String key) {
 		BlockingQueue<V> queue = mediations.get(key);

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
index bf509f6..249a492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/IterationAggregatorBroker.java
@@ -16,17 +16,20 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 import org.apache.flink.runtime.iterative.task.RuntimeAggregatorRegistry;
 
+/**
+ * {@link Broker} for {@link RuntimeAggregatorRegistry}.
+ */
 public class IterationAggregatorBroker extends Broker<RuntimeAggregatorRegistry> {
-	
-	/** single instance */
+
 	private static final IterationAggregatorBroker INSTANCE = new IterationAggregatorBroker();
 
-	/** retrieve singleton instance */
+	/**
+	 * Retrieve singleton instance.
+	 */
 	public static IterationAggregatorBroker instance() {
 		return INSTANCE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
index 3d9d0ea..f9ba0c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetBroker.java
@@ -23,9 +23,6 @@ package org.apache.flink.runtime.iterative.concurrent;
  */
 public class SolutionSetBroker extends Broker<Object> {
 
-	/**
-	 * Singleton instance
-	 */
 	private static final SolutionSetBroker INSTANCE = new SolutionSetBroker();
 
 	/**
@@ -34,6 +31,6 @@ public class SolutionSetBroker extends Broker<Object> {
 	public static Broker<Object> instance() {
 		return INSTANCE;
 	}
-	
+
 	private SolutionSetBroker() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index 352a262..9a1e40a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -22,15 +22,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadTask;
 import org.apache.flink.runtime.iterative.task.IterationTailTask;
 
 /**
- * Broker to hand over {@link SolutionSetUpdateBarrier} from 
+ * Broker to hand over {@link SolutionSetUpdateBarrier} from
  * {@link IterationHeadTask} to
  * {@link IterationTailTask}.
  */
 public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {
 
-	/**
-	 * Singleton instance
-	 */
 	private static final SolutionSetUpdateBarrierBroker INSTANCE = new SolutionSetUpdateBarrierBroker();
 
 	private SolutionSetUpdateBarrierBroker() {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
index cc5d3c5..54bb870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrier.java
@@ -16,22 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import java.util.concurrent.CountDownLatch;
-
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.types.Value;
 
+import java.util.concurrent.CountDownLatch;
+
 /**
  * A resettable one-shot latch.
  */
 public class SuperstepBarrier implements EventListener<TaskEvent> {
-	
+
 	private final ClassLoader userCodeClassLoader;
 
 	private boolean terminationSignaled = false;
@@ -40,19 +39,17 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
 
 	private String[] aggregatorNames;
 	private Value[] aggregates;
-	
-	
+
 	public SuperstepBarrier(ClassLoader userCodeClassLoader) {
 		this.userCodeClassLoader = userCodeClassLoader;
 	}
-	
 
-	/** setup the barrier, has to be called at the beginning of each superstep */
+	/** Setup the barrier, has to be called at the beginning of each superstep. */
 	public void setup() {
 		latch = new CountDownLatch(1);
 	}
 
-	/** wait on the barrier */
+	/** Wait on the barrier. */
 	public void waitForOtherWorkers() throws InterruptedException {
 		latch.await();
 	}
@@ -60,13 +57,12 @@ public class SuperstepBarrier implements EventListener<TaskEvent> {
 	public String[] getAggregatorNames() {
 		return aggregatorNames;
 	}
-	
+
 	public Value[] getAggregates() {
 		return aggregates;
 	}
 
-	/** barrier will release the waiting thread if an event occurs
-	 * @param event*/
+	/** Barrier will release the waiting thread if an event occurs. */
 	@Override
 	public void onEvent(TaskEvent event) {
 		if (event instanceof TerminationEvent) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
index 83b7a4a..b86ade2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatch.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+/**
+ * Latch used to wait for the previous superstep to complete.
+ */
 public class SuperstepKickoffLatch {
-	
+
 	private final Object monitor = new Object();
-	
+
 	private int superstepNumber = 1;
-	
+
 	private boolean terminated;
-	
+
 	public void triggerNextSuperstep() {
 		synchronized (monitor) {
 			if (terminated) {
@@ -35,14 +38,14 @@ public class SuperstepKickoffLatch {
 			monitor.notifyAll();
 		}
 	}
-	
+
 	public void signalTermination() {
 		synchronized (monitor) {
 			terminated = true;
 			monitor.notifyAll();
 		}
 	}
-	
+
 	public boolean awaitStartOfSuperstepOrTermination(int superstep) throws InterruptedException {
 		while (true) {
 			synchronized (monitor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
index f137680..3b545c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchBroker.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+/**
+ * {@link Broker} for {@link SuperstepKickoffLatch}.
+ */
 public class SuperstepKickoffLatchBroker extends Broker<SuperstepKickoffLatch> {
 
 	private static final SuperstepKickoffLatchBroker INSTANCE = new SuperstepKickoffLatchBroker();
 
 	private SuperstepKickoffLatchBroker() {}
 
-
+	/**
+	 * Retrieve the singleton instance.
+	 */
 	public static Broker<SuperstepKickoffLatch> instance() {
 		return INSTANCE;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index 2987b89..19c26bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.convergence;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.types.LongValue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A workset iteration is by definition converged if no records have been updated in the solutionset
+ * A workset iteration is by definition converged if no records have been updated in the solutionset.
  */
 public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<LongValue> {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger log = LoggerFactory.getLogger(WorksetEmptyConvergenceCriterion.class);
-	
+
 	public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
index 2fd0db1..e62bf78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/AllWorkersDoneEvent.java
@@ -16,19 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 
+import java.util.Map;
+
+/**
+ * Event sent by the {@code IterationSynchronizationSinkTask} to each
+ * {@code IterationHead} signaling to start a new superstep.
+ */
 public class AllWorkersDoneEvent extends IterationEventWithAggregators {
 
 	public AllWorkersDoneEvent() {
 		super();
 	}
-	
+
 	public AllWorkersDoneEvent(Map<String, Aggregator<?>> aggregators) {
 		super(aggregators);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index 4e1c19e..06b7687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -32,13 +27,21 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class for iteration {@link TaskEvent} transmitting operator aggregators.
+ */
 public abstract class IterationEventWithAggregators extends TaskEvent {
-	
+
 	protected static final String[] NO_STRINGS = new String[0];
 	protected static final Value[] NO_VALUES = new Value[0];
-	
+
 	private String[] aggNames;
-	
+
 	private String[] classNames;
 	private byte[][] serializedData;
 
@@ -53,11 +56,11 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 		if (aggregatorName == null || aggregate == null) {
 			throw new NullPointerException();
 		}
-		
+
 		this.aggNames = new String[] { aggregatorName };
 		this.aggregates = new Value[] { aggregate };
 	}
-	
+
 	protected IterationEventWithAggregators(Map<String, Aggregator<?>> aggregators) {
 		int num = aggregators.size();
 		if (num == 0) {
@@ -66,7 +69,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 		} else {
 			this.aggNames = new String[num];
 			this.aggregates = new Value[num];
-			
+
 			int i = 0;
 			for (Map.Entry<String, Aggregator<?>> entry : aggregators.entrySet()) {
 				this.aggNames[i] = entry.getKey();
@@ -75,7 +78,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 			}
 		}
 	}
-	
+
 	public String[] getAggregatorNames() {
 		return this.aggNames;
 	}
@@ -97,21 +100,19 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 				catch (ClassCastException e) {
 					throw new RuntimeException("User-defined aggregator class is not a value sublass.");
 				}
-				
-				
+
 				try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(
-					new ByteArrayInputStream(serializedData[i])))
-				{
+					new ByteArrayInputStream(serializedData[i]))) {
 					v.read(in);
 				}
 				catch (IOException e) {
 					throw new RuntimeException("Error while deserializing the user-defined aggregate class.", e);
 				}
-				
+
 				aggregates[i] = v;
 			}
 		}
-		
+
 		return this.aggregates;
 	}
 
@@ -119,15 +120,15 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 	public void write(DataOutputView out) throws IOException {
 		int num = this.aggNames.length;
 		out.writeInt(num);
-		
+
 		ByteArrayOutputStream boas = new ByteArrayOutputStream();
 		DataOutputViewStreamWrapper bufferStream = new DataOutputViewStreamWrapper(boas);
-		
+
 		for (int i = 0; i < num; i++) {
 			// aggregator name and type
 			out.writeUTF(this.aggNames[i]);
 			out.writeUTF(this.aggregates[i].getClass().getName());
-			
+
 			// aggregator value indirect as a byte array
 			this.aggregates[i].write(bufferStream);
 			bufferStream.flush();
@@ -160,14 +161,14 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 			for (int i = 0; i < num; i++) {
 				this.aggNames[i] = in.readUTF();
 				this.classNames[i] = in.readUTF();
-				
+
 				int len = in.readInt();
 				byte[] data = new byte[len];
 				this.serializedData[i] = data;
 				in.readFully(data);
-				
+
 			}
-			
+
 			this.aggregates = null;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
index 28181e8..f1523df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/TerminationEvent.java
@@ -16,22 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.IOException;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.TaskEvent;
 
+import java.io.IOException;
+
 /**
- * Signals that the iteration is completely executed, participating tasks must terminate now
+ * Signals that the iteration is completely executed, participating tasks must terminate now.
  */
 public class TerminationEvent extends TaskEvent {
 
 	public static final TerminationEvent INSTANCE = new TerminationEvent();
-	
+
 	@Override
 	public void write(DataOutputView out) throws IOException {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
index 2e348e9..ce24f23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/WorkerDoneEvent.java
@@ -16,21 +16,24 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Value;
 
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Completion event sent from each {@code IterationHead} to the
+ * {@code IterationSynchronizationSinkTask}.
+ */
 public class WorkerDoneEvent extends IterationEventWithAggregators {
-	
+
 	private int workerIndex;
-	
+
 	public WorkerDoneEvent() {
 		super();
 	}
@@ -39,22 +42,22 @@ public class WorkerDoneEvent extends IterationEventWithAggregators {
 		super(aggregatorName, aggregate);
 		this.workerIndex = workerIndex;
 	}
-	
+
 	public WorkerDoneEvent(int workerIndex, Map<String, Aggregator<?>> aggregators) {
 		super(aggregators);
 		this.workerIndex = workerIndex;
 	}
-	
+
 	public int getWorkerIndex() {
 		return workerIndex;
 	}
-	
+
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(this.workerIndex);
 		super.write(out);
 	}
-	
+
 	@Override
 	public void read(DataInputView in) throws IOException {
 		this.workerIndex = in.readInt();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
index 93ae55f..5bd3131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/HashPartitionIterator.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.hash.HashPartition;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Iterator;
+
 /**
- * {@link Iterator} over the build side entries of a {@link HashPartition}
- * 
+ * {@link Iterator} over the build side entries of a {@link HashPartition}.
+ *
  * @param <BT>
  */
 public class HashPartitionIterator<BT, PT> implements MutableObjectIterator<BT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index 7776894..353dcca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -16,9 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -28,14 +35,10 @@ import java.util.Deque;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
-
+/**
+ * {@link AbstractPagedOutputView} used by the {@code BlockingBackChannel} for
+ * transmitting superstep results.
+ */
 public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 	private static final int HEADER_LENGTH = 4;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index f326d89..10962d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
+ *
+ * <p>The records are written to a hash table to allow in-memory point updates.
+ *
+ * <p>Assumption for fast updates: the build side iterator of the hash table is already positioned for the update. This
  * is for example the case when a solution set update happens directly after a solution set join. If this assumption
  * doesn't hold, use {@link SolutionSetUpdateOutputCollector}, which probes the hash table before updating.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
index 21a9cc8..6a57dfc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetObjectsUpdateOutputCollector.java
@@ -24,12 +24,12 @@ import org.apache.flink.util.Collector;
 
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
  * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- * 
+ *
  * @see SolutionSetFastUpdateOutputCollector
  */
 public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T> {
@@ -37,9 +37,9 @@ public class SolutionSetObjectsUpdateOutputCollector<T> implements Collector<T>
 	private final Collector<T> delegate;
 
 	private final JoinHashMap<T> hashMap;
-	
+
 	private final TypeSerializer<T> serializer;
-	
+
 	public SolutionSetObjectsUpdateOutputCollector(JoinHashMap<T> hashMap) {
 		this(hashMap, null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index c39efa5..90041c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * A {@link Collector} to update the solution set of a workset iteration.
- * <p>
- * The records are written to a HashTable hash table to allow in-memory point updates.
- * <p>
- * Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
+ *
+ * <p>The records are written to a HashTable hash table to allow in-memory point updates.
+ *
+ * <p>Records will only be collected, if there is a match after probing the hash table. If the build side iterator is
  * already positioned for the update, use {@link SolutionSetFastUpdateOutputCollector} to the save re-probing.
- * 
+ *
  * @see SolutionSetFastUpdateOutputCollector
  */
 public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
index ad1d274..329e768 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/WorksetUpdateOutputCollector.java
@@ -16,19 +16,18 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
-import java.io.IOException;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Collector;
 
+import java.io.IOException;
+
 /**
  * A {@link Collector} to update the iteration workset (partial solution for bulk iterations).
- * <p>
- * The records are written to a {@link DataOutputView} to allow in-memory data exchange.
+ *
+ * <p>The records are written to a {@link DataOutputView} to allow in-memory data exchange.
  */
 public class WorksetUpdateOutputCollector<T> implements Collector<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
index 047fd7e..bde358c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -21,11 +21,6 @@ package org.apache.flink.runtime.iterative.task;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.Function;
@@ -33,7 +28,9 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
@@ -45,6 +42,7 @@ import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCri
 import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
 import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,6 +53,9 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
@@ -64,10 +65,9 @@ import java.util.concurrent.Future;
  * The abstract base class for all tasks able to participate in an iteration.
  */
 public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
-		implements Terminable
-{
+		implements Terminable {
 	private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
-	
+
 	protected LongSumAggregator worksetAggregator;
 
 	protected BlockingBackChannel worksetBackChannel;
@@ -77,14 +77,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	protected boolean isWorksetUpdate;
 
 	protected boolean isSolutionSetUpdate;
-	
 
 	private RuntimeAggregatorRegistry iterationAggregators;
 
 	private String brokerKey;
 
 	private int superstepNum = 1;
-	
+
 	private volatile boolean terminationRequested;
 
 	// --------------------------------------------------------------------------------------------
@@ -105,7 +104,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 				}
 			}
 		}
-		
+
 		TaskConfig config = getLastTasksConfig();
 		isWorksetIteration = config.getIsWorksetIteration();
 		isWorksetUpdate = config.getIsWorksetUpdate();
@@ -134,7 +133,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 		} else {
 			reinstantiateDriver();
 			resetAllInputs();
-			
+
 			// re-read the iterative broadcast variables
 			for (int i : this.iterativeBroadcastInputs) {
 				final String name = getTaskConfig().getBroadcastInputName(i);
@@ -144,7 +143,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 
 		// call the parent to execute the superstep
 		super.run();
-		
+
 		// release the iterative broadcast variables
 		for (int i : this.iterativeBroadcastInputs) {
 			final String name = getTaskConfig().getBroadcastInputName(i);
@@ -244,8 +243,9 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 					@SuppressWarnings("unchecked")
 					MutableObjectIterator<Object> inIter = (MutableObjectIterator<Object>) this.inputIterators[inputNum];
 					Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
-					while ((o = inIter.next(o)) != null);
-					
+					while ((o = inIter.next(o)) != null) {
+					}
+
 					if (!reader.isFinished()) {
 						// also reset the end-of-superstep state
 						reader.startNextSuperstep();
@@ -253,17 +253,17 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 				}
 			}
 		}
-		
+
 		for (int inputNum : this.iterativeBroadcastInputs) {
 			MutableReader<?> reader = this.broadcastInputReaders[inputNum];
 
 			if (!reader.isFinished()) {
-				
+
 				// sanity check that the BC input is at the end of the superstep
 				if (!reader.hasReachedEndOfSuperstep()) {
 					throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
 				}
-				
+
 				reader.startNextSuperstep();
 			}
 		}
@@ -291,11 +291,11 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 
 	/**
 	 * Creates a new {@link WorksetUpdateOutputCollector}.
-	 * <p>
-	 * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+	 *
+	 * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
 	 * workset.
-	 * <p>
-	 * If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
+	 *
+	 * <p>If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
 	 * collect(T) of the delegate.
 	 *
 	 * @param delegate null -OR- the delegate on which to call collect() by the newly created collector
@@ -313,13 +313,13 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 
 	/**
 	 * Creates a new solution set update output collector.
-	 * <p>
-	 * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+	 *
+	 * <p>This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
 	 * solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
 	 * {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
 	 * {@link SolutionSetUpdateOutputCollector} is created.
-	 * <p>
-	 * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
+	 *
+	 * <p>If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
 	 * collect(T) of the delegate.
 	 *
 	 * @param delegate null -OR- a delegate collector to be called by the newly created collector
@@ -328,7 +328,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	 */
 	protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
 		Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
-		
+
 		Object ss = solutionSetBroker.get(brokerKey());
 		if (ss instanceof CompactingHashTable) {
 			@SuppressWarnings("unchecked")
@@ -363,7 +363,7 @@ public abstract class AbstractIterativeTask<S extends Function, OT> extends Batc
 	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
 
 		public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
-											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulatorMap, MetricGroup metrics) {
+											Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, MetricGroup metrics) {
 			super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 575072d..b673ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.operators.Driver;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.util.JoinHashMap;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -39,6 +27,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
 import org.apache.flink.runtime.iterative.concurrent.Broker;
@@ -54,12 +46,20 @@ import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * The head is responsible for coordinating an iteration and can run a
  * {@link Driver} inside. It will read
@@ -71,11 +71,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * the second iteration, the input for the head is the output of the tail, transmitted through the backchannel. Once the
  * iteration is done, the head
  * will send a {@link TerminationEvent} to all it's connected tasks, signaling them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
+ *
+ * <p>Assumption on the ordering of the outputs: - The first n output gates write to channels that go to the tasks of the
  * step function. - The next m output gates to to the tasks that consume the final solution. - The last output gate
  * connects to the synchronization task.
- * 
+ *
  * @param <X>
  *        The type of the bulk partial solution / solution set and the final output.
  * @param <Y>
@@ -131,8 +131,8 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 	}
 
 	/**
-	 * the iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
-	 * hands it to the iteration tail via a {@link Broker} singleton
+	 * The iteration head prepares the backchannel: it allocates memory, instantiates a {@link BlockingBackChannel} and
+	 * hands it to the iteration tail via a {@link Broker} singleton.
 	 **/
 	private BlockingBackChannel initBackChannel() throws Exception {
 
@@ -154,7 +154,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 
 		return backChannel;
 	}
-	
+
 	private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
 		// get some memory
 		double hashjoinMemorySize = config.getRelativeSolutionSetMemory();
@@ -162,7 +162,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 
 		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer(userCodeClassLoader);
 		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator(userCodeClassLoader);
-	
+
 		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
 		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
 
@@ -194,27 +194,27 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			}
 		}
 	}
-	
+
 	private <BT> JoinHashMap<BT> initJoinHashMap() {
 		TypeSerializerFactory<BT> solutionTypeSerializerFactory = config.getSolutionSetSerializer
 				(getUserCodeClassLoader());
 		TypeComparatorFactory<BT> solutionTypeComparatorFactory = config.getSolutionSetComparator
 				(getUserCodeClassLoader());
-	
+
 		TypeSerializer<BT> solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
 		TypeComparator<BT> solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
-		
+
 		return new JoinHashMap<BT>(solutionTypeSerializer, solutionTypeComparator);
 	}
-	
+
 	private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
 		solutionSet.open();
 		solutionSet.buildTableWithUniqueKey(solutionSetInput);
 	}
-	
+
 	private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
 		TypeSerializer<X> serializer = solutionTypeSerializer.getSerializer();
-		
+
 		X next;
 		while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
 			solutionSet.insertOrReplace(next);
@@ -232,12 +232,12 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 	public void run() throws Exception {
 		final String brokerKey = brokerKey();
 		final int workerIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
-		
+
 		final boolean objectSolutionSet = config.isSolutionSetUnmanaged();
 
 		CompactingHashTable<X> solutionSet = null; // if workset iteration
 		JoinHashMap<X> solutionSetObjectMap = null; // if workset iteration with unmanaged solution set
-		
+
 		boolean waitForSolutionSetUpdate = config.getWaitForSolutionSetUpdate();
 		boolean isWorksetIteration = config.getIsWorksetIteration();
 
@@ -245,7 +245,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			/* used for receiving the current iteration result from iteration tail */
 			SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
 			SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-			
+
 			BlockingBackChannel backChannel = initBackChannel();
 			SuperstepBarrier barrier = initSuperstepBarrier();
 			SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
@@ -262,7 +262,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 				// setup the index for the solution set
 				@SuppressWarnings("unchecked")
 				MutableObjectIterator<X> solutionSetInput = (MutableObjectIterator<X>) createInputIterator(inputReaders[initialSolutionSetInput], solutionTypeSerializer);
-				
+
 				// read the initial solution set
 				if (objectSolutionSet) {
 					solutionSetObjectMap = initJoinHashMap();
@@ -284,7 +284,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 				@SuppressWarnings("unchecked")
 				TypeSerializerFactory<X> solSer = (TypeSerializerFactory<X>) feedbackTypeSerializer;
 				solutionTypeSerializer = solSer;
-				
+
 				// = termination Criterion tail
 				if (waitForSolutionSetUpdate) {
 					solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
@@ -352,7 +352,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 					String[] globalAggregateNames = barrier.getAggregatorNames();
 					Value[] globalAggregates = barrier.getAggregates();
 					aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
-					
+
 					nextStepKickoff.triggerNextSuperstep();
 				}
 			}
@@ -398,7 +398,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			out.collect(record);
 		}
 	}
-	
+
 	private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
 		final MutableObjectIterator<X> results = hashTable.getEntryIterator();
 		final Collector<X> output = this.finalOutputCollector;
@@ -408,7 +408,7 @@ public class IterationHeadTask<X, Y, S extends Function, OT> extends AbstractIte
 			output.collect(record);
 		}
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
 		final Collector<X> output = this.finalOutputCollector;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 16a7008..c5fd133 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.iterative.task;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
 import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.util.Collector;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,11 +35,11 @@ import java.io.IOException;
 
 /**
  * An intermediate iteration task, which runs a {@link org.apache.flink.runtime.operators.Driver} inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
+ *
+ * <p>It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to its connected tasks. Furthermore
  * intermediate tasks can also update the iteration state, either the workset or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
  * a {@link BlockingBackChannel} for the workset -XOR- a HashTable for the solution set. In this case
  * this task must be scheduled on the same instance as the head.
  */
@@ -64,7 +65,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 			if (isSolutionSetUpdate) {
 				throw new IllegalStateException("Plan bug: Intermediate task performs workset and solutions set update.");
 			}
-			
+
 			Collector<OT> outputCollector = createWorksetUpdateOutputCollector(delegate);
 
 			// we need the WorksetUpdateOutputCollector separately to count the collected elements
@@ -80,7 +81,7 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 
 	@Override
 	public void run() throws Exception {
-		
+
 		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
 
 		while (this.running && !terminationRequested()) {
@@ -98,19 +99,19 @@ public class IterationIntermediateTask<S extends Function, OT> extends AbstractI
 				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
 				worksetAggregator.aggregate(numCollected);
 			}
-			
+
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
 			}
-			
+
 			// let the successors know that the end of this superstep data is reached
 			sendEndOfSuperstep();
-			
+
 			if (isWorksetUpdate) {
 				// notify iteration head if responsible for workset update
 				worksetBackChannel.notifyOfEndOfSuperstep();
 			}
-			
+
 			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
 
 			if (terminated) {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 11a8cfa..6a38fcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -16,31 +16,31 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.types.IntValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * The task responsible for synchronizing all iteration heads, implemented as an output task. This task
  * will never see any data.
@@ -52,13 +52,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
 
 	private MutableRecordReader<IntValue> headEventReader;
-	
+
 	private SyncEventHandler eventHandler;
 
 	private ConvergenceCriterion<Value> convergenceCriterion;
 
 	private ConvergenceCriterion<Value> implicitConvergenceCriterion;
-	
+
 	private Map<String, Aggregator<?>> aggregators;
 
 	private String convergenceAggregatorName;
@@ -66,13 +66,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	private String implicitConvergenceAggregatorName;
 
 	private int currentIteration = 1;
-	
+
 	private int maxNumberOfIterations;
 
 	private final AtomicBoolean terminated = new AtomicBoolean(false);
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public void invoke() throws Exception {
 		this.headEventReader = new MutableRecordReader<>(
@@ -80,13 +80,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 				getEnvironment().getTaskManagerInfo().getTmpDirectories());
 
 		TaskConfig taskConfig = new TaskConfig(getTaskConfiguration());
-		
+
 		// store all aggregators
 		this.aggregators = new HashMap<>();
 		for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators(getUserCodeClassLoader())) {
 			aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
 		}
-		
+
 		// store the aggregator convergence criterion
 		if (taskConfig.usesConvergenceCriterion()) {
 			convergenceCriterion = taskConfig.getConvergenceCriterion(getUserCodeClassLoader());
@@ -100,9 +100,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 			implicitConvergenceAggregatorName = taskConfig.getImplicitConvergenceCriterionAggregatorName();
 			Preconditions.checkNotNull(implicitConvergenceAggregatorName);
 		}
-		
+
 		maxNumberOfIterations = taskConfig.getNumberOfIterations();
-		
+
 		// set up the event handler
 		int numEventsTillEndOfSuperstep = taskConfig.getNumberOfEventsUntilInterruptInIterativeGate(0);
 		eventHandler = new SyncEventHandler(numEventsTillEndOfSuperstep, aggregators,
@@ -110,7 +110,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 		headEventReader.registerTaskEventListener(eventHandler, WorkerDoneEvent.class);
 
 		IntValue dummy = new IntValue();
-		
+
 		while (!terminationRequested()) {
 
 			if (log.isInfoEnabled()) {
@@ -140,7 +140,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 
 				AllWorkersDoneEvent allWorkersDoneEvent = new AllWorkersDoneEvent(aggregators);
 				sendToAllWorkers(allWorkersDoneEvent);
-				
+
 				// reset all aggregators
 				for (Aggregator<?> agg : aggregators.values()) {
 					agg.reset();
@@ -165,7 +165,7 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 			if (aggregator == null) {
 				throw new RuntimeException("Error: Aggregator for convergence criterion was null.");
 			}
-			
+
 			Value aggregate = aggregator.getAggregate();
 
 			if (convergenceCriterion.isConverged(currentIteration, aggregate)) {
@@ -194,14 +194,14 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 				return true;
 			}
 		}
-		
+
 		return false;
 	}
 
 	private void readHeadEventChannel(IntValue rec) throws IOException {
 		// reset the handler
 		eventHandler.resetEndOfSuperstep();
-		
+
 		// read (and thereby process all events in the handler's event handling functions)
 		try {
 			if (this.headEventReader.next(rec)) {
@@ -222,9 +222,9 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 	private String formatLogString(String message) {
 		return BatchTask.constructLogString(message, getEnvironment().getTaskInfo().getTaskName(), this);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean terminationRequested() {
 		return terminated.get();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
index 9e0b560..3ec3a8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
@@ -28,15 +26,18 @@ import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker
 import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
 import org.apache.flink.util.Collector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
+ *
+ * <p>If the iteration state is updated, the output of this task will be send back to the {@link IterationHeadTask} via
  * a BackChannel for the workset -OR- a HashTable for the solution set. Therefore this
  * task must be scheduled on the same instance as the head. It's also possible for the tail to update *both* the workset
  * and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
+ *
+ * <p>If there is a separate solution set tail, the iteration head has to make sure to wait for it to finish.
  */
 public class IterationTailTask<S extends Function, OT> extends AbstractIterativeTask<S, OT> {
 
@@ -45,7 +46,6 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 	private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
 
 	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-	
 
 	@Override
 	protected void initialize() throws Exception {
@@ -80,6 +80,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 				outputCollector = new Collector<OT>() {
 					@Override
 					public void collect(OT record) {}
+
 					@Override
 					public void close() {}
 				};
@@ -95,9 +96,9 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 
 	@Override
 	public void run() throws Exception {
-		
+
 		SuperstepKickoffLatch nextSuperStepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
-		
+
 		while (this.running && !terminationRequested()) {
 
 			if (log.isInfoEnabled()) {
@@ -119,7 +120,7 @@ public class IterationTailTask<S extends Function, OT> extends AbstractIterative
 			if (log.isInfoEnabled()) {
 				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
 			}
-			
+
 			if (isWorksetUpdate) {
 				// notify iteration head if responsible for workset update
 				worksetBackChannel.notifyOfEndOfSuperstep();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
index 7beb4c7..c4a30c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/RuntimeAggregatorRegistry.java
@@ -18,56 +18,56 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.types.Value;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
  *
  */
 public class RuntimeAggregatorRegistry {
-	
+
 	private final Map<String, Aggregator<?>> aggregators;
-	
+
 	private final Map<String, Value> previousGlobalAggregate;
-	
+
 	public RuntimeAggregatorRegistry(Collection<AggregatorWithName<?>> aggs) {
 		this.aggregators = new HashMap<String, Aggregator<?>>();
 		this.previousGlobalAggregate = new HashMap<String, Value>();
-		
+
 		for (AggregatorWithName<?> agg : aggs) {
 			this.aggregators.put(agg.getName(), agg.getAggregator());
 		}
 	}
-	
+
 	public Value getPreviousGlobalAggregate(String name) {
 		return this.previousGlobalAggregate.get(name);
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	public <T extends Aggregator<?>> T getAggregator(String name) {
 		return (T) this.aggregators.get(name);
 	}
-	
+
 	public Map<String, Aggregator<?>> getAllAggregators() {
 		return this.aggregators;
 	}
-	
+
 	public void updateGlobalAggregatesAndReset(String[] names, Value[] aggregates) {
 		if (names == null || aggregates == null || names.length != aggregates.length) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		// add global aggregates
-		for (int i = 0 ; i < names.length; i++) {
+		for (int i = 0; i < names.length; i++) {
 			this.previousGlobalAggregate.put(names[i], aggregates[i]);
 		}
-		
+
 		// reset all aggregators
 		for (Aggregator<?> agg : this.aggregators.values()) {
 			agg.reset();

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index 71c15b1..45843ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -18,27 +18,30 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Map;
+
+/**
+ * Listener for {@link WorkerDoneEvent} which also aggregates all aggregators
+ * from iteration tasks and signals the end of the superstep.
+ */
 public class SyncEventHandler implements EventListener<TaskEvent> {
-	
+
 	private final ClassLoader userCodeClassLoader;
-	
+
 	private final Map<String, Aggregator<?>> aggregators;
 
 	private final int numberOfEventsUntilEndOfSuperstep;
 
 	private int workerDoneEventCounter;
-	
-	private boolean endOfSuperstep;
 
+	private boolean endOfSuperstep;
 
 	public SyncEventHandler(int numberOfEventsUntilEndOfSuperstep, Map<String, Aggregator<?>> aggregators, ClassLoader userCodeClassLoader) {
 		Preconditions.checkArgument(numberOfEventsUntilEndOfSuperstep > 0);
@@ -60,7 +63,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
 		if (this.endOfSuperstep) {
 			throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
 		}
-		
+
 		workerDoneEventCounter++;
 
 		String[] aggNames = workerDoneEvent.getAggregatorNames();
@@ -69,7 +72,7 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
 		if (aggNames.length != aggregates.length) {
 			throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
 		}
-		
+
 		for (int i = 0; i < aggNames.length; i++) {
 			@SuppressWarnings("unchecked")
 			Aggregator<Value> aggregator = (Aggregator<Value>) this.aggregators.get(aggNames[i]);
@@ -81,11 +84,11 @@ public class SyncEventHandler implements EventListener<TaskEvent> {
 			Thread.currentThread().interrupt();
 		}
 	}
-	
+
 	public boolean isEndOfSuperstep() {
 		return this.endOfSuperstep;
 	}
-	
+
 	public void resetEndOfSuperstep() {
 		this.endOfSuperstep = false;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
index d6f7544..9d952ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/Terminable.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.task;
 
 /**
- * Models the functionality that the termination of an iterative task can be requested from outside
+ * Models the functionality that the termination of an iterative task can be requested from outside.
  */
 public interface Terminable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
index d22a23b..c9015e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BlockingBackChannelTest.java
@@ -16,25 +16,26 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link BlockingBackChannel}.
+ */
 public class BlockingBackChannelTest {
 
 	private static final int NUM_ITERATIONS = 3;

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
index 3f2c243..e12cb32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/BrokerTest.java
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
+import org.apache.flink.util.Preconditions;
+
 import com.google.common.collect.Lists;
 import org.junit.Test;
 
-import org.apache.flink.util.Preconditions;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -35,6 +34,9 @@ import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link Broker}.
+ */
 public class BrokerTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
index 798d9f5..82ddac7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/StringPair.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
 class StringPair {

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
index 2f26670..1187adf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepBarrierTest.java
@@ -16,19 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.concurrent;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Random;
-
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
+
 import org.junit.Test;
 
+import java.util.Random;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link SuperstepBarrier}.
+ */
 public class SuperstepBarrierTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/834c5277/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
index 8c8deb2..90555d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/iterative/concurrent/SuperstepKickoffLatchTest.java
@@ -21,30 +21,33 @@ package org.apache.flink.runtime.iterative.concurrent;
 import org.junit.Assert;
 import org.junit.Test;
 
+/**
+ * Tests for {@link SuperstepKickoffLatch}.
+ */
 public class SuperstepKickoffLatchTest {
 
 	@Test
 	public void testWaitFromOne() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
-			
+
 			Waiter w = new Waiter(latch, 2);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			Thread.sleep(100);
-			
+
 			latch.triggerNextSuperstep();
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				throw w.getError();
 			}
@@ -54,28 +57,28 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitAlreadyFulfilled() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
-			
+
 			Waiter w = new Waiter(latch, 2);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			Thread.sleep(100);
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				throw w.getError();
 			}
@@ -85,14 +88,14 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitIncorrect() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
 			latch.triggerNextSuperstep();
-			
+
 			try {
 				latch.awaitStartOfSuperstepOrTermination(2);
 				Assert.fail("should throw exception");
@@ -106,29 +109,29 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitIncorrectAsync() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
 			latch.triggerNextSuperstep();
-			
+
 			Waiter w = new Waiter(latch, 2);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			Thread.sleep(100);
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				if (!(w.getError() instanceof IllegalStateException)) {
 					throw new Exception("wrong exception type " + w.getError());
@@ -142,29 +145,29 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWaitForTermination() {
 		try {
 			SuperstepKickoffLatch latch = new SuperstepKickoffLatch();
 			latch.triggerNextSuperstep();
 			latch.triggerNextSuperstep();
-			
+
 			Waiter w = new Waiter(latch, 4);
 			Thread waiter = new Thread(w);
 			waiter.setDaemon(true);
 			waiter.start();
-			
+
 			WatchDog wd = new WatchDog(waiter, 2000);
 			wd.start();
-			
+
 			latch.signalTermination();
-			
+
 			wd.join();
 			if (wd.getError() != null) {
 				throw wd.getError();
 			}
-			
+
 			if (w.getError() != null) {
 				throw w.getError();
 			}
@@ -174,16 +177,15 @@ public class SuperstepKickoffLatchTest {
 			Assert.fail("Error: " + t.getMessage());
 		}
 	}
-	
+
 	private static class Waiter implements Runnable {
 
 		private final SuperstepKickoffLatch latch;
-		
+
 		private final int waitFor;
-		
+
 		private volatile Throwable error;
-		
-		
+
 		public Waiter(SuperstepKickoffLatch latch, int waitFor) {
 			this.latch = latch;
 			this.waitFor = waitFor;
@@ -198,37 +200,37 @@ public class SuperstepKickoffLatchTest {
 				this.error = t;
 			}
 		}
-		
+
 		public Throwable getError() {
 			return error;
 		}
 	}
-	
+
 	private static class WatchDog extends Thread {
-		
+
 		private final Thread toWatch;
-		
+
 		private final long timeOut;
-		
+
 		private volatile Throwable failed;
-		
+
 		public WatchDog(Thread toWatch, long timeout) {
 			setDaemon(true);
 			setName("Watchdog");
 			this.toWatch = toWatch;
 			this.timeOut = timeout;
 		}
-		
+
 		@SuppressWarnings("deprecation")
 		@Override
 		public void run() {
 			try {
 				toWatch.join(timeOut);
-				
+
 				if (toWatch.isAlive()) {
 					this.failed = new Exception("timed out");
 					toWatch.interrupt();
-					
+
 					toWatch.join(2000);
 					if (toWatch.isAlive()) {
 						toWatch.stop();
@@ -239,7 +241,7 @@ public class SuperstepKickoffLatchTest {
 				failed = t;
 			}
 		}
-		
+
 		public Throwable getError() {
 			return failed;
 		}


[3/7] flink git commit: [FLINK-7070] Use properly built custom jar in ScalaShellITCase

Posted by ch...@apache.org.
[FLINK-7070] Use properly built custom jar in ScalaShellITCase

Before, the external jar loading tests where using the flink-ml jar
without the scala shell package actually declaring this as a dependency.
Now we built our own jar and have no unlisted dependencies anymore.

This closes #4249.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/343a8042
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/343a8042
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/343a8042

Branch: refs/heads/master
Commit: 343a804209dc5eb030b18ccb4c03afc459b150c5
Parents: 41806ba
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 3 17:09:25 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 11:37:12 2017 +0200

----------------------------------------------------------------------
 flink-scala-shell/pom.xml                       | 55 ++++++++++++++++
 .../test-scalashell-customjar-assembly.xml      | 35 ++++++++++
 .../flink/api/scala/ScalaShellITCase.scala      | 67 ++++----------------
 .../flink/api/scala/jar/TestingData.scala       | 26 ++++++++
 .../apache/flink/api/scala/jar/package.scala    | 25 ++++++++
 5 files changed, 152 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/pom.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/pom.xml b/flink-scala-shell/pom.xml
index 6f490e4..e312699 100644
--- a/flink-scala-shell/pom.xml
+++ b/flink-scala-shell/pom.xml
@@ -242,6 +242,61 @@ under the License.
 				</executions>
 			</plugin>
 
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>create-library-loading-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<!--<archive>-->
+								<!--<manifest>-->
+									<!--<mainClass>org.apache.flink.test.classloading.jar.KMeansForTest</mainClass>-->
+								<!--</manifest>-->
+							<!--</archive>-->
+							<finalName>customjar</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-scalashell-customjar-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+
+			<!--Remove the the classes in the jar package from the test-classes directory since they
+			musn't be in the classpath when running ScalaShellITCase to actually test loading
+			of external jars.-->
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>remove-classloading-test-dependencies</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/jar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
 		</plugins>
 	</build>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml b/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml
new file mode 100644
index 0000000..ef20cfd
--- /dev/null
+++ b/flink-scala-shell/src/test/assembly/test-scalashell-customjar-assembly.xml
@@ -0,0 +1,35 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/api/scala/jar/*</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 0e89da3..ce08304 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -169,36 +169,19 @@ class ScalaShellITCase extends TestLogger {
   def testSubmissionOfExternalLibraryBatch: Unit = {
     val input =
       """
-         import org.apache.flink.ml.math._
-         val denseVectors = benv.fromElements[Vector](DenseVector(1.0, 2.0, 3.0))
-         denseVectors.print()
+         import org.apache.flink.api.scala.jar.TestingData
+         val source = benv.fromCollection(TestingData.elements)
+         source.print()
       """.stripMargin
 
-    // find jar file that contains the ml code
-    var externalJar = ""
-    val folder = findLibraryFolder(
-      "../flink-libraries/flink-ml/target/",
-      "../../flink-libraries/flink-ml/target/")
-
-    val listOfFiles = folder.listFiles()
-
-    for (i <- listOfFiles.indices) {
-      val filename: String = listOfFiles(i).getName
-      if (!filename.contains("test") && !filename.contains("original") && filename.contains(
-        ".jar")) {
-        externalJar = listOfFiles(i).getAbsolutePath
-      }
-    }
-
-    assert(externalJar != "")
-
-    val output: String = processInShell(input, Option(externalJar))
+    val output: String = processInShell(input, Option("customjar-test-jar.jar"))
 
     Assert.assertFalse(output.contains("failed"))
     Assert.assertFalse(output.contains("error"))
     Assert.assertFalse(output.contains("Exception"))
 
-    Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
+
+    Assert.assertTrue(output.contains("\nHELLO 42"))
   }
 
   /** Submit external library */
@@ -206,37 +189,19 @@ class ScalaShellITCase extends TestLogger {
   def testSubmissionOfExternalLibraryStream: Unit = {
     val input =
       """
-        import org.apache.flink.ml.math._
-        val denseVectors = senv.fromElements[Vector](DenseVector(1.0, 2.0, 3.0))
-        denseVectors.print()
+        import org.apache.flink.api.scala.jar.TestingData
+        val source = senv.fromCollection(TestingData.elements)
+        source.print()
         senv.execute
       """.stripMargin
 
-    // find jar file that contains the ml code
-    var externalJar = ""
-    val folder = findLibraryFolder(
-      "../flink-libraries/flink-ml/target/",
-      "../../flink-libraries/flink-ml/target/")
-
-    val listOfFiles = folder.listFiles()
-
-    for (i <- listOfFiles.indices) {
-      val filename: String = listOfFiles(i).getName
-      if (!filename.contains("test") && !filename.contains("original") && filename.contains(
-        ".jar")) {
-        externalJar = listOfFiles(i).getAbsolutePath
-      }
-    }
-
-    assert(externalJar != "")
-
-    val output: String = processInShell(input, Option(externalJar))
+    val output: String = processInShell(input, Option("customjar-test-jar.jar"))
 
     Assert.assertFalse(output.contains("failed"))
     Assert.assertFalse(output.contains("error"))
     Assert.assertFalse(output.contains("Exception"))
 
-    Assert.assertTrue(output.contains("\nDenseVector(1.0, 2.0, 3.0)"))
+    Assert.assertTrue(output.contains("\nHELLO 42"))
   }
 
 
@@ -413,14 +378,4 @@ object ScalaShellITCase {
       case _ => throw new IllegalStateException("The cluster has not been started.")
     }
   }
-
-  def findLibraryFolder(paths: String*): File = {
-    for (path <- paths) {
-      val folder = new File(path)
-      if (folder.exists()) {
-        return folder
-      }
-    }
-    throw new RuntimeException("Library folder not found in any of the supplied paths!")
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala
new file mode 100644
index 0000000..428a8e6
--- /dev/null
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/TestingData.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.jar
+
+/**
+ * Testing data for [[org.apache.flink.api.scala.ScalaShellITCase]]. This will be put into a
+ * separate jar file to test loading of external libraries.
+ */
+object TestingData {
+  val elements = Seq("HELLO 42", "CIAO", "BLA", "BLU")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/343a8042/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala
new file mode 100644
index 0000000..51303f2
--- /dev/null
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/jar/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala
+
+/**
+ * Custom objects for use in testing loading of external jars in [[ScalaShellITCase]].
+ */
+package object jar {
+
+}


[7/7] flink git commit: [FLINK-6826] [runtime] Activate checkstyle for runtime/net

Posted by ch...@apache.org.
[FLINK-6826] [runtime] Activate checkstyle for runtime/net

This closes #4065.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85853eda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85853eda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85853eda

Branch: refs/heads/master
Commit: 85853edab3e76ec83790f1b9c8f51bd2f6afe087
Parents: c59537e
Author: zentol <ch...@apache.org>
Authored: Fri Jun 2 23:25:16 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:33 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../flink/runtime/net/ConnectionUtils.java      | 57 ++++++++++----------
 .../org/apache/flink/runtime/net/SSLUtils.java  | 15 +++---
 .../flink/runtime/net/ConnectionUtilsTest.java  | 17 +++---
 .../apache/flink/runtime/net/SSLUtilsTest.java  | 26 ++++-----
 5 files changed, 59 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 1fff305..a21f753 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -447,7 +447,6 @@ under the License.
 						**/runtime/memory/**,
 						**/runtime/messages/**,
 						**/runtime/minicluster/**,
-						**/runtime/net/**,
 						**/runtime/operators/**,
 						**/runtime/plugable/**,
 						**/runtime/query/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 75a7ebe..673d27c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.runtime.net;
 
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -32,25 +39,17 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import scala.concurrent.duration.FiniteDuration;
 
-
 /**
  * Utilities to determine the network interface and address that should be used to bind the
  * TaskManager communication to.
- * 
+ *
  * <p>Implementation note: This class uses {@code System.nanoTime()} to measure elapsed time, because
  * that is not susceptible to clock changes.
  */
 public class ConnectionUtils {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);
 
 	private static final long MIN_SLEEP_TIME = 50;
@@ -61,15 +60,15 @@ public class ConnectionUtils {
 	 * There is only a state transition if the current state failed to determine the address.
 	 */
 	private enum AddressDetectionState {
-		/** Connect from interface returned by InetAddress.getLocalHost() **/
+		/** Connect from interface returned by InetAddress.getLocalHost(). **/
 		LOCAL_HOST(200),
 		/** Detect own IP address based on the target IP address. Look for common prefix */
 		ADDRESS(50),
-		/** Try to connect on all Interfaces and all their addresses with a low timeout */
+		/** Try to connect on all Interfaces and all their addresses with a low timeout. */
 		FAST_CONNECT(50),
-		/** Try to connect on all Interfaces and all their addresses with a long timeout */
+		/** Try to connect on all Interfaces and all their addresses with a long timeout. */
 		SLOW_CONNECT(1000),
-		/** Choose any non-loopback address */
+		/** Choose any non-loopback address. */
 		HEURISTIC(0);
 
 		private final int timeout;
@@ -90,11 +89,11 @@ public class ConnectionUtils {
 	 * given target, so it only succeeds if the target socket address actually accepts
 	 * connections. The method tries various strategies multiple times and uses an exponential
 	 * backoff timer between tries.
-	 * <p>
-	 * If no connection attempt was successful after the given maximum time, the method
+	 *
+	 * <p>If no connection attempt was successful after the given maximum time, the method
 	 * will choose some address based on heuristics (excluding link-local and loopback addresses.)
-	 * <p>
-	 * This method will initially not log on info level (to not flood the log while the
+	 *
+	 * <p>This method will initially not log on info level (to not flood the log while the
 	 * backoff time is still very low). It will start logging after a certain time
 	 * has passes.
 	 *
@@ -104,8 +103,7 @@ public class ConnectionUtils {
 	 * @param startLoggingAfter The time after which the method will log on INFO level.
 	 */
 	public static InetAddress findConnectingAddress(InetSocketAddress targetAddress,
-							long maxWaitMillis, long startLoggingAfter) throws IOException
-	{
+							long maxWaitMillis, long startLoggingAfter) throws IOException {
 		if (targetAddress == null) {
 			throw new NullPointerException("targetAddress must not be null");
 		}
@@ -188,11 +186,11 @@ public class ConnectionUtils {
 	 */
 	private static InetAddress tryLocalHostBeforeReturning(
 				InetAddress preliminaryResult, SocketAddress targetAddress, boolean logging) throws IOException {
-		
+
 		InetAddress localhostName = InetAddress.getLocalHost();
-		
+
 		if (preliminaryResult.equals(localhostName)) {
-			// preliminary result is equal to the local host name 
+			// preliminary result is equal to the local host name
 			return preliminaryResult;
 		}
 		else if (tryToConnect(localhostName, targetAddress, AddressDetectionState.SLOW_CONNECT.getTimeout(), logging)) {
@@ -209,7 +207,7 @@ public class ConnectionUtils {
 
 	/**
 	 * Try to find a local address which allows as to connect to the targetAddress using the given
-	 * strategy
+	 * strategy.
 	 *
 	 * @param strategy Depending on the strategy, the method will enumerate all interfaces, trying to connect
 	 *                 to the target address
@@ -220,8 +218,7 @@ public class ConnectionUtils {
 	 */
 	private static InetAddress findAddressUsingStrategy(AddressDetectionState strategy,
 														InetSocketAddress targetAddress,
-														boolean logging) throws IOException
-	{
+														boolean logging) throws IOException {
 		// try LOCAL_HOST strategy independent of the network interfaces
 		if (strategy == AddressDetectionState.LOCAL_HOST) {
 			InetAddress localhostName;
@@ -316,8 +313,7 @@ public class ConnectionUtils {
 	 * @throws IOException Thrown if the socket cleanup fails.
 	 */
 	private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSocket,
-										int timeout, boolean logFailed) throws IOException
-	{
+										int timeout, boolean logFailed) throws IOException {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
 					+ " with timeout " + timeout);
@@ -341,6 +337,9 @@ public class ConnectionUtils {
 		}
 	}
 
+	/**
+	 * A {@link LeaderRetrievalListener} that allows retrieving an {@link InetAddress} for the current leader.
+	 */
 	public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
 
 		private static final FiniteDuration defaultLoggingDelay = new FiniteDuration(400, TimeUnit.MILLISECONDS);
@@ -351,7 +350,7 @@ public class ConnectionUtils {
 			NEWLY_RETRIEVED
 		}
 
-		final private Object retrievalLock = new Object();
+		private final Object retrievalLock = new Object();
 
 		private String akkaURL;
 		private LeaderRetrievalState retrievalState = LeaderRetrievalState.NOT_RETRIEVED;

http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
index 015b3d6..663d221 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
@@ -31,6 +31,7 @@ import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.TrustManagerFactory;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.net.ServerSocket;
@@ -38,13 +39,13 @@ import java.security.KeyStore;
 import java.util.Arrays;
 
 /**
- * Common utilities to manage SSL transport settings
+ * Common utilities to manage SSL transport settings.
  */
 public class SSLUtils {
 	private static final Logger LOG = LoggerFactory.getLogger(SSLUtils.class);
 
 	/**
-	 * Retrieves the global ssl flag from configuration
+	 * Retrieves the global ssl flag from configuration.
 	 *
 	 * @param sslConfig
 	 *        The application configuration
@@ -58,7 +59,7 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Sets SSl version and cipher suites for SSLServerSocket
+	 * Sets SSl version and cipher suites for SSLServerSocket.
 	 * @param socket
 	 *        Socket to be handled
 	 * @param config
@@ -81,7 +82,7 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Sets SSL version and cipher suites for SSLEngine
+	 * Sets SSL version and cipher suites for SSLEngine.
 	 * @param engine
 	 *        SSLEngine to be handled
 	 * @param config
@@ -93,7 +94,7 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Sets SSL options to verify peer's hostname in the certificate
+	 * Sets SSL options to verify peer's hostname in the certificate.
 	 *
 	 * @param sslConfig
 	 *        The application configuration
@@ -112,7 +113,7 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Creates the SSL Context for the client if SSL is configured
+	 * Creates the SSL Context for the client if SSL is configured.
 	 *
 	 * @param sslConfig
 	 *        The application configuration
@@ -160,7 +161,7 @@ public class SSLUtils {
 	}
 
 	/**
-	 * Creates the SSL Context for the server if SSL is configured
+	 * Creates the SSL Context for the server if SSL is configured.
 	 *
 	 * @param sslConfig
 	 *        The application configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index b5c2819..d7c4baa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -15,14 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.runtime.net;
 
-import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.UnknownHostException;
+package org.apache.flink.runtime.net;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -31,6 +25,13 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -48,7 +49,7 @@ public class ConnectionUtilsTest {
 			// the "blocker" server socket simply does not accept connections
 			// this address is consequently "unreachable"
 			InetSocketAddress unreachable = new InetSocketAddress("localhost", blocker.getLocalPort());
-			
+
 			final long start = System.nanoTime();
 			InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/85853eda/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
index a3c2b7b..87d0ccc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
@@ -15,26 +15,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.net;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLServerSocket;
+
 import java.net.ServerSocket;
-import java.util.Random;
 
-/*
- * Tests for the SSL utilities
+/**
+ * Tests for the {@link SSLUtils}.
  */
 public class SSLUtilsTest {
 
 	/**
-	 * Tests if SSL Client Context is created given a valid SSL configuration
+	 * Tests if SSL Client Context is created given a valid SSL configuration.
 	 */
 	@Test
 	public void testCreateSSLClientContext() throws Exception {
@@ -49,7 +51,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSL Client Context is not created if SSL is not configured
+	 * Tests if SSL Client Context is not created if SSL is not configured.
 	 */
 	@Test
 	public void testCreateSSLClientContextWithSSLDisabled() throws Exception {
@@ -62,7 +64,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSL Client Context creation fails with bad SSL configuration
+	 * Tests if SSL Client Context creation fails with bad SSL configuration.
 	 */
 	@Test
 	public void testCreateSSLClientContextMisconfiguration() {
@@ -81,7 +83,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSL Server Context is created given a valid SSL configuration
+	 * Tests if SSL Server Context is created given a valid SSL configuration.
 	 */
 	@Test
 	public void testCreateSSLServerContext() throws Exception {
@@ -97,7 +99,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSL Server Context is not created if SSL is disabled
+	 * Tests if SSL Server Context is not created if SSL is disabled.
 	 */
 	@Test
 	public void testCreateSSLServerContextWithSSLDisabled() throws Exception {
@@ -110,7 +112,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSL Server Context creation fails with bad SSL configuration
+	 * Tests if SSL Server Context creation fails with bad SSL configuration.
 	 */
 	@Test
 	public void testCreateSSLServerContextMisconfiguration() {
@@ -130,7 +132,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSL Server Context creation fails with bad SSL configuration
+	 * Tests if SSL Server Context creation fails with bad SSL configuration.
 	 */
 	@Test
 	public void testCreateSSLServerContextWithMultiProtocols() {
@@ -151,7 +153,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket
+	 * Tests if SSLUtils set the right ssl version and cipher suites for SSLServerSocket.
 	 */
 	@Test
 	public void testSetSSLVersionAndCipherSuitesForSSLServerSocket() throws Exception {
@@ -192,7 +194,7 @@ public class SSLUtilsTest {
 	}
 
 	/**
-	 * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine
+	 * Tests if SSLUtils set the right ssl version and cipher suites for SSLEngine.
 	 */
 	@Test
 	public void testSetSSLVersionAndCipherSuitesForSSLEngine() throws Exception {


[5/7] flink git commit: [FLINK-6821] [runtime] Activate checkstyle for runtime/fs

Posted by ch...@apache.org.
[FLINK-6821] [runtime] Activate checkstyle for runtime/fs

This closes #4063.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9f659e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9f659e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9f659e0

Branch: refs/heads/master
Commit: c9f659e046a7b42e79d72df74bead5809ab2fe46
Parents: 31ad802
Author: zentol <ch...@apache.org>
Authored: Fri Jun 2 21:16:52 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:31 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../runtime/fs/hdfs/HadoopBlockLocation.java    | 16 ++---
 .../runtime/fs/hdfs/HadoopDataInputStream.java  | 19 +++--
 .../runtime/fs/hdfs/HadoopDataOutputStream.java |  8 ++-
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java |  2 +-
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java | 75 +++++++++-----------
 .../flink/runtime/fs/maprfs/MapRFileSystem.java | 29 ++++----
 .../fs/hdfs/HadoopDataInputStreamTest.java      |  3 +
 8 files changed, 73 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 80f95a5..1fff305 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -436,7 +436,6 @@ under the License.
 						**/runtime/deployment/**,
 						**/runtime/execution/**,
 						**/runtime/executiongraph/**,
-						**/runtime/fs/**,
 						**/runtime/heartbeat/**,
 						**/runtime/highavailability/**,
 						**/runtime/instance/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
index a1cc72c..1484c95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -16,19 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.fs.hdfs;
 
+import org.apache.flink.core.fs.BlockLocation;
+
 import java.io.IOException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.core.fs.BlockLocation;
-
 /**
  * Implementation of the {@link BlockLocation} interface for the
  * Hadoop Distributed File System.
- * 
  */
 public final class HadoopBlockLocation implements BlockLocation {
 
@@ -53,8 +51,8 @@ public final class HadoopBlockLocation implements BlockLocation {
 	private String[] hostnames;
 
 	/**
-	 * Creates a new block location
-	 * 
+	 * Creates a new block location.
+	 *
 	 * @param blockLocation
 	 *        the original HDFS block location
 	 */
@@ -63,7 +61,6 @@ public final class HadoopBlockLocation implements BlockLocation {
 		this.blockLocation = blockLocation;
 	}
 
-
 	@Override
 	public String[] getHosts() throws IOException {
 
@@ -88,7 +85,7 @@ public final class HadoopBlockLocation implements BlockLocation {
 
 	/**
 	 * Looks for a domain suffix in a FQDN and strips it if present.
-	 * 
+	 *
 	 * @param originalHostname
 	 *        the original hostname, possibly an FQDN
 	 * @return the stripped hostname without the domain suffix
@@ -114,21 +111,18 @@ public final class HadoopBlockLocation implements BlockLocation {
 		return originalHostname.substring(0, index);
 	}
 
-
 	@Override
 	public long getLength() {
 
 		return this.blockLocation.getLength();
 	}
 
-
 	@Override
 	public long getOffset() {
 
 		return this.blockLocation.getOffset();
 	}
 
-
 	@Override
 	public int compareTo(final BlockLocation o) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 3cc841e..da50c4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -27,19 +27,19 @@ import java.io.IOException;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Concrete implementation of the {@link FSDataInputStream} for the Hadoop's input streams.
+ * Concrete implementation of the {@link FSDataInputStream} for Hadoop's input streams.
  * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
  */
 public final class HadoopDataInputStream extends FSDataInputStream {
 
 	/**
 	 * Minimum amount of bytes to skip forward before we issue a seek instead of discarding read.
-	 * <p>
-	 * The current value is just a magic number. In the long run, this value could become configurable, but for now it
+	 *
+	 * <p>The current value is just a magic number. In the long run, this value could become configurable, but for now it
 	 * is a conservative, relatively small value that should bring safe improvements for small skips (e.g. in reading
 	 * meta data), that would hurt the most with frequent seeks.
-	 * <p>
-	 * The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
+	 *
+	 * <p>The optimal value depends on the DFS implementation and configuration plus the underlying filesystem.
 	 * For now, this number is chosen "big enough" to provide improvements for smaller seeks, and "small enough" to
 	 * avoid disadvantages over real seeks. While the minimum should be the page size, a true optimum per system would
 	 * be the amounts of bytes the can be consumed sequentially within the seektime. Unfortunately, seektime is not
@@ -47,11 +47,11 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 	 */
 	public static final int MIN_SKIP_BYTES = 1024 * 1024;
 
-	/** The internal stream */
+	/** The internal stream. */
 	private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream;
 
 	/**
-	 * Creates a new data input stream from the given Hadoop input stream
+	 * Creates a new data input stream from the given Hadoop input stream.
 	 *
 	 * @param fsDataInputStream The Hadoop input stream
 	 */
@@ -59,7 +59,6 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 		this.fsDataInputStream = checkNotNull(fsDataInputStream);
 	}
 
-
 	@Override
 	public void seek(long seekPos) throws IOException {
 		// We do some optimizations to avoid that some implementations of distributed FS perform
@@ -116,8 +115,8 @@ public final class HadoopDataInputStream extends FSDataInputStream {
 	/**
 	 * Positions the stream to the given location. In contrast to {@link #seek(long)}, this method will
 	 * always issue a "seek" command to the dfs and may not replace it by {@link #skip(long)} for small seeks.
-	 * <p>
-	 * Notice that the underlying DFS implementation can still decide to do skip instead of seek.
+	 *
+	 * <p>Notice that the underlying DFS implementation can still decide to do skip instead of seek.
 	 *
 	 * @param seekPos the position to seek to.
 	 * @throws IOException

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
index 8787181..1b8d1a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.fs.hdfs;
 
-import java.io.IOException;
-
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+import java.io.IOException;
+
+/**
+ * Concrete implementation of the {@link FSDataOutputStream} for Hadoop's input streams.
+ * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n).
+ */
 public class HadoopDataOutputStream extends FSDataOutputStream {
 
 	private final org.apache.hadoop.fs.FSDataOutputStream fdos;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 519791e..17bb334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -31,7 +31,7 @@ public final class HadoopFileStatus implements FileStatus {
 
 	/**
 	 * Creates a new file status from a HDFS file status.
-	 * 
+	 *
 	 * @param fileStatus
 	 *        the HDFS file status
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 1371d21..f47423f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.configuration.ConfigConstants;
@@ -25,6 +26,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.HadoopFileSystemWrapper;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,16 +43,16 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
  * class is a wrapper class which encapsulated the original Hadoop HDFS API.
  *
- * If no file system class is specified, the wrapper will automatically load the Hadoop
+ * <p>If no file system class is specified, the wrapper will automatically load the Hadoop
  * distributed file system (HDFS).
  *
  */
 public final class HadoopFileSystem extends FileSystem implements HadoopFileSystemWrapper {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystem.class);
-	
+
 	private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";
-	
+
 	/**
 	 * Configuration value name for the DFS implementation name. Usually not specified in hadoop configurations.
 	 */
@@ -64,7 +66,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	/**
 	 * Creates a new DistributedFileSystem object to access HDFS, based on a class name
 	 * and picking up the configuration from the class path or the Flink configuration.
-	 * 
+	 *
 	 * @throws IOException
 	 *         throw if the required HDFS classes cannot be instantiated
 	 */
@@ -72,7 +74,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		// Create new Hadoop configuration object
 		this.conf = getHadoopConfiguration();
 
-		if(fsClass == null) {
+		if (fsClass == null) {
 			fsClass = getDefaultHDFSClass();
 		}
 
@@ -126,8 +128,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		}
 
 		// fall back to an older Hadoop version
-		if (fsClass == null)
-		{
+		if (fsClass == null) {
 			// first of all, check for a user-defined hdfs class
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry '{}'.",
@@ -136,13 +137,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 			Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
 
-			if (classFromConfig != null)
-			{
+			if (classFromConfig != null) {
 				if (org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(classFromConfig)) {
 					fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
 
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName() );
+						LOG.debug("Loaded HDFS class '{}' as specified in configuration.", fsClass.getName());
 					}
 				}
 				else {
@@ -187,7 +187,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	}
 
 	/**
-	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured 
+	 * Returns a new Hadoop Configuration object using the path to the hadoop conf configured
 	 * in the main configuration (flink-conf.yaml).
 	 * This method is public because its being used in the HadoopDataSource.
 	 */
@@ -215,15 +215,15 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		} else {
 			LOG.trace("{} configuration key for hdfs-site configuration file not set", ConfigConstants.HDFS_SITE_CONFIG);
 		}
-		
+
 		// 2. Approach environment variables
-		String[] possibleHadoopConfPaths = new String[4]; 
+		String[] possibleHadoopConfPaths = new String[4];
 		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
 		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-		
+
 		if (System.getenv("HADOOP_HOME") != null) {
-			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf";
-			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2
+			possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME") + "/conf";
+			possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME") + "/etc/hadoop"; // hadoop 2.2
 		}
 
 		for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
@@ -245,10 +245,9 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		}
 		return retConf;
 	}
-	
+
 	private org.apache.hadoop.fs.FileSystem instantiateFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass)
-		throws IOException
-	{
+		throws IOException {
 		try {
 			return fsClass.newInstance();
 		}
@@ -266,7 +265,6 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		}
 	}
 
-
 	@Override
 	public Path getWorkingDirectory() {
 		return new Path(this.fs.getWorkingDirectory().toUri());
@@ -288,30 +286,30 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
 		return this.fs;
 	}
-	
+
 	@Override
 	public void initialize(URI path) throws IOException {
-		
+
 		// If the authority is not part of the path, we initialize with the fs.defaultFS entry.
 		if (path.getAuthority() == null) {
-			
+
 			String configEntry = this.conf.get("fs.defaultFS", null);
 			if (configEntry == null) {
 				// fs.default.name deprecated as of hadoop 2.2.0 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
 				configEntry = this.conf.get("fs.default.name", null);
 			}
-			
+
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("fs.defaultFS is set to {}", configEntry);
 			}
-			
+
 			if (configEntry == null) {
 				throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system (hdfs) configuration was registered, " +
 						"or that configuration did not contain an entry for the default file system (usually 'fs.defaultFS').");
 			} else {
 				try {
 					URI initURI = URI.create(configEntry);
-					
+
 					if (initURI.getAuthority() == null) {
 						throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system was registered, " +
 								"or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) " +
@@ -330,7 +328,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 					throw new IOException(getMissingAuthorityErrorPrefix(path) +
 							"The configuration contains an invalid file system default name (fs.default.name or fs.defaultFS): " + configEntry);
 				}
-			} 
+			}
 		}
 		else {
 			// Initialize file system
@@ -341,11 +339,11 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 				String message = "The (HDFS NameNode) host at '" + path.getAuthority()
 						+ "', specified by file path '" + path.toString() + "', cannot be resolved"
 						+ (e.getMessage() != null ? ": " + e.getMessage() : ".");
-				
+
 				if (path.getPort() == -1) {
 					message += " Hint: Have you forgotten a slash? (correct URI would be 'hdfs:///" + path.getAuthority() + path.getPath() + "' ?)";
 				}
-				
+
 				throw new IOException(message, e);
 			}
 			catch (Exception e) {
@@ -355,14 +353,13 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 			}
 		}
 	}
-	
+
 	private static String getMissingAuthorityErrorPrefix(URI path) {
 		return "The given HDFS file URI (" + path.toString() + ") did not describe the HDFS NameNode." +
-				" The attempt to use a default HDFS configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG + "' or '" + 
+				" The attempt to use a default HDFS configuration, as specified in the '" + ConfigConstants.HDFS_DEFAULT_CONFIG + "' or '" +
 				ConfigConstants.HDFS_SITE_CONFIG + "' config parameter failed due to the following problem: ";
 	}
 
-
 	@Override
 	public FileStatus getFileStatus(final Path f) throws IOException {
 		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
@@ -371,8 +368,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 	@Override
 	public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
-	throws IOException
-	{
+			throws IOException {
 		if (!(file instanceof HadoopFileStatus)) {
 			throw new IOException("file is not an instance of DistributedFileStatus");
 		}
@@ -407,15 +403,12 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 
 	@Override
 	public HadoopDataOutputStream create(final Path f, final boolean overwrite, final int bufferSize,
-			final short replication, final long blockSize)
-	throws IOException
-	{
+			final short replication, final long blockSize) throws IOException {
 		final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
 			new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
 		return new HadoopDataOutputStream(fdos);
 	}
 
-
 	@Override
 	public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException {
 		final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
@@ -437,7 +430,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		for (int i = 0; i < files.length; i++) {
 			files[i] = new HadoopFileStatus(hadoopFiles[i]);
 		}
-		
+
 		return files;
 	}
 
@@ -476,7 +469,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 //		}
 		clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
 
-		if(clazz != null && LOG.isDebugEnabled()) {
+		if (clazz != null && LOG.isDebugEnabled()) {
 			LOG.debug("Flink supports {} with the Hadoop file system wrapper, impl {}", scheme, clazz);
 		}
 		return clazz;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index 57eea6f..275e492 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -18,17 +18,6 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -40,6 +29,18 @@ import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
 import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Concrete implementation of the {@link FileSystem} base class for the MapR
  * file system. The class contains MapR specific code to initialize the
@@ -94,7 +95,7 @@ public final class MapRFileSystem extends FileSystem {
 
 	/**
 	 * Creates a new MapRFileSystem object to access the MapR file system.
-	 * 
+	 *
 	 * @throws IOException
 	 *             throw if the required MapR classes cannot be found
 	 */
@@ -180,8 +181,8 @@ public final class MapRFileSystem extends FileSystem {
 	}
 
 	/**
-	 * Retrieves the CLDB locations for the given MapR cluster name
-	 * 
+	 * Retrieves the CLDB locations for the given MapR cluster name.
+	 *
 	 * @param authority
 	 *            the name of the MapR cluster
 	 * @return a list of CLDB locations

http://git-wip-us.apache.org/repos/asf/flink/blob/c9f659e0/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
index 58de3db..21c18bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStreamTest.java
@@ -35,6 +35,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Tests for the {@link HadoopDataInputStream}.
+ */
 public class HadoopDataInputStreamTest {
 
 	private FSDataInputStream verifyInputStream;


[6/7] flink git commit: [FLINK-7039] [build] Increase forkCountTestPackage for sudo-enabled TravisCI

Posted by ch...@apache.org.
[FLINK-7039] [build] Increase forkCountTestPackage for sudo-enabled TravisCI

The switch from the container-based to sudo-enabled environment in
TravisCI has increased available memory from 4 GB to 7.5 GB so use a
forkCount of 2 in all packages including flink-test.

This closes #4222.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c59537ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c59537ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c59537ee

Branch: refs/heads/master
Commit: c59537eebc7251e3078553d84271597bb720b518
Parents: c9f659e
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Jun 29 06:42:51 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 4 12:33:32 2017 +0200

----------------------------------------------------------------------
 tools/travis_mvn_watchdog.sh | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c59537ee/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 9733866..f3c1699 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -44,9 +44,10 @@ SLEEP_TIME=20
 LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
 
 # Maven command to run. We set the forkCount manually, because otherwise Maven sees too many cores
-# on the Travis VMs.
+# on the Travis VMs. Set forkCountTestPackage to 1 for container-based environment (4 GiB memory)
+# and 2 for sudo-enabled environment (7.5 GiB memory).
 MVN_LOGGING_OPTIONS="-Dlog.dir=${ARTIFACTS_DIR} -Dlog4j.configuration=file://$LOG4J_PROPERTIES -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn"
-MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=1 -Dmaven.javadoc.skip=true -B $PROFILE $MVN_LOGGING_OPTIONS clean install"
+MVN="mvn -Dflink.forkCount=2 -Dflink.forkCountTestPackage=2 -Dmaven.javadoc.skip=true -B $PROFILE $MVN_LOGGING_OPTIONS clean install"
 
 MVN_PID="${ARTIFACTS_DIR}/watchdog.mvn.pid"
 MVN_EXIT="${ARTIFACTS_DIR}/watchdog.mvn.exit"