You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:24 UTC

[15/15] flink git commit: [FLINK-6701] Activate strict checkstyle for flink-yarn

[FLINK-6701] Activate strict checkstyle for flink-yarn

This closes #3990.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77b0fb9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77b0fb9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77b0fb9f

Branch: refs/heads/master
Commit: 77b0fb9fe3656a5ae7e2ca3bbce28cfa5a0e247e
Parents: d313ac7
Author: zentol <ch...@apache.org>
Authored: Wed May 24 15:10:15 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri May 26 19:17:59 2017 +0200

----------------------------------------------------------------------
 .../yarn/AbstractYarnClusterDescriptor.java     | 165 +++++++-------
 ...bstractYarnFlinkApplicationMasterRunner.java |  21 +-
 .../flink/yarn/RegisteredYarnWorkerNode.java    |   6 +-
 .../main/java/org/apache/flink/yarn/Utils.java  |  72 +++---
 .../flink/yarn/YarnApplicationMasterRunner.java |  58 +++--
 .../apache/flink/yarn/YarnClusterClient.java    |  71 +++---
 .../apache/flink/yarn/YarnClusterClientV2.java  |   8 +-
 .../flink/yarn/YarnClusterDescriptor.java       |   2 +-
 .../flink/yarn/YarnClusterDescriptorV2.java     |   4 +-
 .../org/apache/flink/yarn/YarnConfigKeys.java   |  18 +-
 .../flink/yarn/YarnContainerInLaunch.java       |   3 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |  27 +--
 .../flink/yarn/YarnFlinkResourceManager.java    |  64 +++---
 .../apache/flink/yarn/YarnResourceManager.java  |  82 ++++---
 .../YarnResourceManagerCallbackHandler.java     |   8 +-
 .../flink/yarn/YarnTaskExecutorRunner.java      |   9 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  19 +-
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java |  99 +++++----
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 220 +++++++++----------
 .../yarn/configuration/YarnConfigOptions.java   |   7 +-
 .../YarnHighAvailabilityServices.java           |  36 +--
 .../YarnIntraNonHaMasterServices.java           |  16 +-
 .../YarnPreConfiguredMasterNonHaServices.java   |  12 +-
 .../yarn/messages/ContainersAllocated.java      |  11 +-
 .../flink/yarn/messages/ContainersComplete.java |  12 +-
 flink-yarn/src/main/resources/log4j.properties  |   1 -
 .../apache/flink/yarn/ApplicationClient.scala   |   6 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   1 -
 .../org/apache/flink/yarn/YarnMessages.scala    |   5 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   2 +-
 .../yarn/TestingYarnFlinkResourceManager.java   |   6 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   |  29 ++-
 .../yarn/YarnApplicationMasterRunnerTest.java   |  15 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   |  27 ++-
 .../YarnIntraNonHaMasterServicesTest.java       |  25 ++-
 .../YarnPreConfiguredMasterHaServicesTest.java  |  29 +--
 .../messages/NotifyWhenResourcesRegistered.java |   3 +
 .../RequestNumberOfRegisteredResources.java     |   5 +-
 38 files changed, 611 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b9a4416..2315c70 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -109,6 +110,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	/**
 	 * If the user has specified a different number of slots, we store them here
+	 * Files (usually in a distributed file system) used for the YARN session of Flink.
+	 * Contains configuration files and jar files.
+	 */
+	private Path sessionFilesDir;
+
+	/**
+	 * If the user has specified a different number of slots, we store them here.
 	 */
 	private int slots = -1;
 
@@ -128,7 +136,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private String dynamicPropertiesEncoded;
 
-	/** Lazily initialized list of files to ship */
+	/** Lazily initialized list of files to ship. */
 	protected List<File> shipFiles = new LinkedList<>();
 
 	private org.apache.flink.configuration.Configuration flinkConfiguration;
@@ -140,18 +148,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private String zookeeperNamespace;
 
 	/** Optional Jar file to include in the system class loader of all application nodes
-	 * (for per-job submission) */
+	 * (for per-job submission). */
 	private final Set<File> userJarFiles = new HashSet<>();
 
 	private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
 	public AbstractYarnClusterDescriptor() {
 		// for unit tests only
-		if(System.getenv("IN_TESTS") != null) {
+		if (System.getenv("IN_TESTS") != null) {
 			try {
 				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
 			} catch (Throwable t) {
-				throw new RuntimeException("Error",t);
+				throw new RuntimeException("Error", t);
 			}
 		}
 
@@ -183,17 +191,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	protected abstract Class<?> getApplicationMasterClass();
 
 	public void setJobManagerMemory(int memoryMb) {
-		if(memoryMb < MIN_JM_MEMORY) {
+		if (memoryMb < MIN_JM_MEMORY) {
 			throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
-				+ "of " + MIN_JM_MEMORY+ " MB");
+				+ "of " + MIN_JM_MEMORY + " MB");
 		}
 		this.jobManagerMemoryMb = memoryMb;
 	}
 
 	public void setTaskManagerMemory(int memoryMb) {
-		if(memoryMb < MIN_TM_MEMORY) {
+		if (memoryMb < MIN_TM_MEMORY) {
 			throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
-				+ "of " + MIN_TM_MEMORY+ " MB");
+				+ "of " + MIN_TM_MEMORY + " MB");
 		}
 		this.taskManagerMemoryMb = memoryMb;
 	}
@@ -209,7 +217,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	public void setTaskManagerSlots(int slots) {
-		if(slots <= 0) {
+		if (slots <= 0) {
 			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
 		}
 		this.slots = slots;
@@ -224,7 +232,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	public void setLocalJarPath(Path localJarPath) {
-		if(!localJarPath.toString().endsWith("jar")) {
+		if (!localJarPath.toString().endsWith("jar")) {
 			throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
 		}
 		this.flinkJarPath = localJarPath;
@@ -239,7 +247,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	public void setTaskManagerCount(int tmCount) {
-		if(tmCount < 1) {
+		if (tmCount < 1) {
 			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
 		}
 		this.taskManagerCount = tmCount;
@@ -253,7 +261,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		for (File shipFile: shipFiles) {
 			// remove uberjar from ship list (by default everything in the lib/ folder is added to
 			// the list of files to ship, but we handle the uberjar separately.
-			if(!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
+			if (!(shipFile.getName().startsWith("flink-dist") && shipFile.getName().endsWith("jar"))) {
 				this.shipFiles.add(shipFile);
 			}
 		}
@@ -274,7 +282,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			return false;
 		}
 		try {
-			for(URL jarFile : requiredJarFiles) {
+			for (URL jarFile : requiredJarFiles) {
 				if (!userJarFiles.contains(new File(jarFile.toURI()))) {
 					return false;
 				}
@@ -303,21 +311,20 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return this.dynamicPropertiesEncoded;
 	}
 
-
 	private void isReadyForDeployment() throws YarnDeploymentException {
-		if(taskManagerCount <= 0) {
+		if (taskManagerCount <= 0) {
 			throw new YarnDeploymentException("Taskmanager count must be positive");
 		}
-		if(this.flinkJarPath == null) {
+		if (this.flinkJarPath == null) {
 			throw new YarnDeploymentException("The Flink jar path is null");
 		}
-		if(this.configurationDirectory == null) {
+		if (this.configurationDirectory == null) {
 			throw new YarnDeploymentException("Configuration directory not set");
 		}
-		if(this.flinkConfigurationPath == null) {
+		if (this.flinkConfigurationPath == null) {
 			throw new YarnDeploymentException("Configuration path not set");
 		}
-		if(this.flinkConfiguration == null) {
+		if (this.flinkConfiguration == null) {
 			throw new YarnDeploymentException("Flink configuration object has not been set");
 		}
 
@@ -337,7 +344,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		// check if required Hadoop environment variables are set. If not, warn user
-		if(System.getenv("HADOOP_CONF_DIR") == null &&
+		if (System.getenv("HADOOP_CONF_DIR") == null &&
 			System.getenv("YARN_CONF_DIR") == null) {
 			LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
 				"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
@@ -346,8 +353,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	private static boolean allocateResource(int[] nodeManagers, int toAllocate) {
-		for(int i = 0; i < nodeManagers.length; i++) {
-			if(nodeManagers[i] >= toAllocate) {
+		for (int i = 0; i < nodeManagers.length; i++) {
+			if (nodeManagers[i] >= toAllocate) {
 				nodeManagers[i] -= toAllocate;
 				return true;
 			}
@@ -372,7 +379,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	/**
-	 * Gets a Hadoop Yarn client
+	 * Gets a Hadoop Yarn client.
 	 * @return Returns a YarnClient which has to be shutdown manually
 	 */
 	protected YarnClient getYarnClient() {
@@ -420,7 +427,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient deploy() {
 		try {
-			if(UserGroupInformation.isSecurityEnabled()) {
+			if (UserGroupInformation.isSecurityEnabled()) {
 				// note: UGI::hasKerberosCredentials inaccurately reports false
 				// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
 				// so we check only in ticket cache scenario.
@@ -453,7 +460,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		final YarnClient yarnClient = getYarnClient();
 
-
 		// ------------------ Check if the specified queue exists --------------------
 
 		try {
@@ -477,9 +483,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			} else {
 				LOG.debug("The YARN cluster does not have any queues configured");
 			}
-		} catch(Throwable e) {
+		} catch (Throwable e) {
 			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
-			if(LOG.isDebugEnabled()) {
+			if (LOG.isDebugEnabled()) {
 				LOG.debug("Error details", e);
 			}
 		}
@@ -495,7 +501,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
 		// all allocations below this value are automatically set to this value.
 		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
-		if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
+		if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
 			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
 				+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
 				"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
@@ -503,10 +509,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		// set the memory to minAllocationMB to do the next checks correctly
-		if(jobManagerMemoryMb < yarnMinAllocationMB) {
+		if (jobManagerMemoryMb < yarnMinAllocationMB) {
 			jobManagerMemoryMb =  yarnMinAllocationMB;
 		}
-		if(taskManagerMemoryMb < yarnMinAllocationMB) {
+		if (taskManagerMemoryMb < yarnMinAllocationMB) {
 			taskManagerMemoryMb =  yarnMinAllocationMB;
 		}
 
@@ -515,56 +521,56 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
 
 		Resource maxRes = appResponse.getMaximumResourceCapability();
-		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
-		if(jobManagerMemoryMb > maxRes.getMemory() ) {
+		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
+		if (jobManagerMemoryMb > maxRes.getMemory()) {
 			failSessionDuringDeployment(yarnClient, yarnApplication);
 			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
-				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
+				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
 		}
 
-		if(taskManagerMemoryMb > maxRes.getMemory() ) {
+		if (taskManagerMemoryMb > maxRes.getMemory()) {
 			failSessionDuringDeployment(yarnClient, yarnApplication);
 			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
-				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
+				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
 		}
 
-		final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
+		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
 			"connecting from the beginning because the resources are currently not available in the cluster. " +
 			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
 			"the resources become available.";
 		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
 		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
-		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
 			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
-				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
+				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
 
 		}
-		if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
+		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
 			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
 		}
-		if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
+		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
 			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
-				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
+				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
 		}
 
 		// ----------------- check if the requested containers fit into the cluster.
 
 		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
 		// first, allocate the jobManager somewhere.
-		if(!allocateResource(nmFree, jobManagerMemoryMb)) {
+		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
 			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
 				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
-				Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
+				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
 		}
 		// allocate TaskManagers
-		for(int i = 0; i < taskManagerCount; i++) {
-			if(!allocateResource(nmFree, taskManagerMemoryMb)) {
+		for (int i = 0; i < taskManagerCount; i++) {
+			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
 				LOG.warn("There is not enough memory available in the YARN cluster. " +
 					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
 					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
 					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
-					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + NOTE_RSC );
+					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
 			}
 		}
 
@@ -669,7 +675,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// ship list that enables reuse of resources for task manager containers
 		StringBuilder envShipFileList = new StringBuilder();
 
-		// upload and register ship files	
+		// upload and register ship files
 		List<String> systemClassPaths = uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, localResources, envShipFileList);
 
 		List<String> userClassPaths;
@@ -752,9 +758,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		Path remoteKrb5Path = null;
 		Path remoteYarnSiteXmlPath = null;
 		boolean hasKrb5 = false;
-		if(System.getenv("IN_TESTS") != null) {
+		if (System.getenv("IN_TESTS") != null) {
 			String krb5Config = System.getProperty("java.security.krb5.conf");
-			if(krb5Config != null && krb5Config.length() != 0) {
+			if (krb5Config != null && krb5Config.length() != 0) {
 				File krb5 = new File(krb5Config);
 				LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
 				LocalResource krb5ConfResource = Records.newRecord(LocalResource.class);
@@ -762,7 +768,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				remoteKrb5Path = Utils.setupLocalResource(fs, appId.toString(), krb5ConfPath, krb5ConfResource, fs.getHomeDirectory());
 				localResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
 
-				File f = new File(System.getenv("YARN_CONF_DIR"),Utils.YARN_SITE_FILE_NAME);
+				File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
 				LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
 				LocalResource yarnConfResource = Records.newRecord(LocalResource.class);
 				Path yarnSitePath = new Path(f.getAbsolutePath());
@@ -777,7 +783,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		LocalResource keytabResource = null;
 		Path remotePathKeytab = null;
 		String keytab = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
-		if(keytab != null) {
+		if (keytab != null) {
 			LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
 			keytabResource = Records.newRecord(LocalResource.class);
 			Path keytabPath = new Path(keytab);
@@ -787,7 +793,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
 
-		if ( UserGroupInformation.isSecurityEnabled() && keytab == null ) {
+		if (UserGroupInformation.isSecurityEnabled() && keytab == null) {
 			//set tokens only when keytab is not provided
 			LOG.info("Adding delegation token to the AM container..");
 			Utils.setTokensFor(amContainer, paths, conf);
@@ -806,7 +812,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// set Flink on YARN internal configuration values
 		appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
 		appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
-		appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString() );
+		appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
@@ -818,19 +824,19 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
 		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
 
-		if(keytabResource != null) {
-			appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString() );
+		if (keytabResource != null) {
+			appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
 			String principal = flinkConfiguration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
-			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal );
+			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
 		}
 
 		//To support Yarn Secure Integration Test Scenario
-		if(remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+		if (remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
 			appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
-			appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString() );
+			appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
 		}
 
-		if(dynamicPropertiesEncoded != null) {
+		if (dynamicPropertiesEncoded != null) {
 			appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
 		}
 
@@ -845,9 +851,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		capability.setVirtualCores(1);
 
 		String name;
-		if(customName == null) {
+		if (customName == null) {
 			name = "Flink session with " + taskManagerCount + " TaskManagers";
-			if(detached) {
+			if (detached) {
 				name += " (detached)";
 			}
 		} else {
@@ -858,7 +864,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appContext.setApplicationType("Apache Flink");
 		appContext.setAMContainerSpec(amContainer);
 		appContext.setResource(capability);
-		if(yarnQueue != null) {
+		if (yarnQueue != null) {
 			appContext.setQueue(yarnQueue);
 		}
 
@@ -874,7 +880,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		final long startTime = System.currentTimeMillis();
 		ApplicationReport report;
 		YarnApplicationState lastAppState = YarnApplicationState.NEW;
-		loop: while( true ) {
+		loop: while (true) {
 			try {
 				report = yarnClient.getApplicationReport(appId);
 			} catch (IOException e) {
@@ -899,7 +905,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					if (appState != lastAppState) {
 						LOG.info("Deploying cluster, current state " + appState);
 					}
-					if(System.currentTimeMillis() - startTime > 60000) {
+					if (System.currentTimeMillis() - startTime > 60000) {
 						LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
 					}
 
@@ -922,7 +928,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 		return report;
 	}
-	
+
 	private static List<String> uploadAndRegisterFiles(
 			Collection<File> shipFiles,
 			FileSystem fs,
@@ -971,7 +977,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	/**
 	 * Kills YARN application and stops YARN client.
 	 *
-	 * Use this method to kill the App before it has been properly deployed
+	 * <p>Use this method to kill the App before it has been properly deployed
 	 */
 	private void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnApplication) {
 		LOG.info("Killing YARN application");
@@ -986,11 +992,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		yarnClient.stop();
 	}
 
-
 	private static class ClusterResourceDescription {
-		final public int totalFreeMemory;
-		final public int containerLimit;
-		final public int[] nodeManagersFree;
+		public final int totalFreeMemory;
+		public final int containerLimit;
+		public final int[] nodeManagersFree;
 
 		public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
 			this.totalFreeMemory = totalFreeMemory;
@@ -1006,12 +1011,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		int containerLimit = 0;
 		int[] nodeManagersFree = new int[nodes.size()];
 
-		for(int i = 0; i < nodes.size(); i++) {
+		for (int i = 0; i < nodes.size(); i++) {
 			NodeReport rep = nodes.get(i);
-			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0);
 			nodeManagersFree[i] = free;
 			totalFreeMemory += free;
-			if(free > containerLimit) {
+			if (free > containerLimit) {
 				containerLimit = free;
 			}
 		}
@@ -1060,7 +1065,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	public void setName(String name) {
-		if(name == null) {
+		if (name == null) {
 			throw new IllegalArgumentException("The passed name is null");
 		}
 		customName = name;
@@ -1098,9 +1103,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
 	 * supports various methods which, depending on the Hadoop version, may or may not be supported.
 	 *
-	 * If an unsupported method is invoked, nothing happens.
+	 * <p>If an unsupported method is invoked, nothing happens.
 	 *
-	 * Currently three methods are proxied:
+	 * <p>Currently three methods are proxied:
 	 * - setApplicationTags (>= 2.4.0)
 	 * - setAttemptFailuresValidityInterval (>= 2.6.0)
 	 * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0)
@@ -1302,11 +1307,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		if (hasLogback || hasLog4j) {
 			logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
 
-			if(hasLogback) {
+			if (hasLogback) {
 				logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
 			}
 
-			if(hasLog4j) {
+			if (hasLog4j) {
 				logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
 			}
 		}
@@ -1345,7 +1350,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	/**
-	 * Creates a YarnClusterClient; may be overriden in tests
+	 * Creates a YarnClusterClient; may be overriden in tests.
 	 */
 	protected YarnClusterClient createYarnClusterClient(
 			AbstractYarnClusterDescriptor descriptor,

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
index 4b24f42..8bf6a2e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnFlinkApplicationMasterRunner.java
@@ -27,10 +27,10 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,21 +43,20 @@ import java.util.concurrent.Callable;
  * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster}
  * and {@link YarnResourceManager}.
  *
- * The JobMasters handles Flink job execution, while the YarnResourceManager handles container
+ * <p>The JobMasters handles Flink job execution, while the YarnResourceManager handles container
  * allocation and failure detection.
  */
 public abstract class AbstractYarnFlinkApplicationMasterRunner {
 
-	/** Logger */
 	protected static final Logger LOG = LoggerFactory.getLogger(AbstractYarnFlinkApplicationMasterRunner.class);
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	protected static final Map<String, String> ENV = System.getenv();
 
-	/** The exit code returned if the initialization of the application master failed */
+	/** The exit code returned if the initialization of the application master failed. */
 	protected static final int INIT_ERROR_EXIT_CODE = 31;
 
-	/** The host name passed by env */
+	/** The host name passed by env. */
 	protected String appMasterHostname;
 
 	/**
@@ -87,7 +86,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 			LOG.info("Remote keytab principal obtained {}", remoteKeytabPrincipal);
 
 			String keytabPath = null;
-			if(remoteKeytabPath != null) {
+			if (remoteKeytabPath != null) {
 				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 				keytabPath = f.getAbsolutePath();
 				LOG.debug("Keytab path: {}", keytabPath);
@@ -96,7 +95,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
-					currentUser.getShortUserName(), yarnClientUsername );
+					currentUser.getShortUserName(), yarnClientUsername);
 
 			// Flink configuration
 			final Map<String, String> dynamicProperties =
@@ -122,7 +121,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 			}
 
 			SecurityUtils.SecurityConfiguration sc;
-			if(hadoopConfiguration != null) {
+			if (hadoopConfiguration != null) {
 				sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
 			} else {
 				sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
@@ -170,7 +169,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 	/**
 	 * @param baseDirectory  The working directory
 	 * @param additional Additional parameters
-	 * 
+	 *
 	 * @return The configuration to be used by the TaskExecutors.
 	 */
 	private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional) {
@@ -194,7 +193,7 @@ public abstract class AbstractYarnFlinkApplicationMasterRunner {
 			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 		}
 
-		// if the user has set the deprecated YARN-specific config keys, we add the 
+		// if the user has set the deprecated YARN-specific config keys, we add the
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
index cb2f40a..5f059bf 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -19,8 +19,8 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
 import org.apache.hadoop.yarn.api.records.Container;
 
 import static java.util.Objects.requireNonNull;
@@ -30,10 +30,10 @@ import static java.util.Objects.requireNonNull;
  */
 public class RegisteredYarnWorkerNode implements ResourceIDRetrievable {
 
-	/** The container on which the worker runs */
+	/** The container on which the worker runs. */
 	private final Container yarnContainer;
 
-	/** The resource id associated with this worker type */
+	/** The resource id associated with this worker type. */
 	private final ResourceID resourceID;
 
 	public RegisteredYarnWorkerNode(Container yarnContainer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 60f7204..698b69e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -18,25 +18,9 @@
 
 package org.apache.flink.yarn;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.util.Records;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.configuration.ConfigConstants;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -57,6 +42,20 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 
@@ -64,20 +63,20 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
  * Utility class that provides helper methods to work with Apache Hadoop YARN.
  */
 public final class Utils {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
-	/** Keytab file name populated in YARN container */
+	/** Keytab file name populated in YARN container. */
 	public static final String KEYTAB_FILE_NAME = "krb5.keytab";
 
-	/** KRB5 file name populated in YARN container for secure IT run */
+	/** KRB5 file name populated in YARN container for secure IT run. */
 	public static final String KRB5_FILE_NAME = "krb5.conf";
 
-	/** Yarn site xml file name populated in YARN container for secure IT run */
+	/** Yarn site xml file name populated in YARN container for secure IT run. */
 	public static final String YARN_SITE_FILE_NAME = "yarn-site.xml";
 
 	/**
-	 * See documentation
+	 * See documentation.
 	 */
 	public static int calculateHeapSize(int memory, org.apache.flink.configuration.Configuration conf) {
 
@@ -102,14 +101,13 @@ public final class Utils {
 				+ "' is higher (" + minCutoff + ") than the requested amount of memory " + memory);
 		}
 
-		int heapLimit = (int)((float)memory * memoryCutoffRatio);
+		int heapLimit = (int) ((float) memory * memoryCutoffRatio);
 		if (heapLimit < minCutoff) {
 			heapLimit = minCutoff;
 		}
 		return memory - heapLimit;
 	}
 
-
 	public static void setupYarnClassPath(Configuration conf, Map<String, String> appMasterEnv) {
 		addToEnvironment(
 			appMasterEnv,
@@ -123,9 +121,7 @@ public final class Utils {
 		}
 	}
 
-
 	/**
-	 * 
 	 * @return Path to remote file (usually hdfs)
 	 * @throws IOException
 	 */
@@ -165,7 +161,7 @@ public final class Utils {
 		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
 
 		Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
-		for(Token<? extends TokenIdentifier> token : usrTok) {
+		for (Token<? extends TokenIdentifier> token : usrTok) {
 			final Text id = new Text(token.getIdentifier());
 			LOG.info("Adding user token " + id + " with " + token);
 			credentials.addToken(id, token);
@@ -173,7 +169,7 @@ public final class Utils {
 		try (DataOutputBuffer dob = new DataOutputBuffer()) {
 			credentials.writeTokenStorageToStream(dob);
 
-			if(LOG.isDebugEnabled()) {
+			if (LOG.isDebugEnabled()) {
 				LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
 			}
 
@@ -193,7 +189,7 @@ public final class Utils {
 				// Intended call: HBaseConfiguration.addHbaseResources(conf);
 				Class
 						.forName("org.apache.hadoop.hbase.HBaseConfiguration")
-						.getMethod("addHbaseResources", Configuration.class )
+						.getMethod("addHbaseResources", Configuration.class)
 						.invoke(null, conf);
 				// ----
 
@@ -220,7 +216,7 @@ public final class Utils {
 
 				credentials.addToken(token.getService(), token);
 				LOG.info("Added HBase Kerberos security token to credentials.");
-			} catch ( ClassNotFoundException
+			} catch (ClassNotFoundException
 					| NoSuchMethodException
 					| IllegalAccessException
 					| InvocationTargetException e) {
@@ -231,7 +227,7 @@ public final class Utils {
 	}
 
 	/**
-	 * Copied method from org.apache.hadoop.yarn.util.Apps
+	 * Copied method from org.apache.hadoop.yarn.util.Apps.
 	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
 	 * by https://issues.apache.org/jira/browse/YARN-1931
 	 */
@@ -262,8 +258,8 @@ public final class Utils {
 	 */
 	public static Map<String, String> getEnvironmentVariables(String envPrefix, org.apache.flink.configuration.Configuration flinkConfiguration) {
 		Map<String, String> result  = new HashMap<>();
-		for(Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
-			if(entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
+		for (Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
+			if (entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
 				// remove prefix
 				String key = entry.getKey().substring(envPrefix.length());
 				result.put(key, entry.getValue());
@@ -347,7 +343,7 @@ public final class Utils {
 
 		//register keytab
 		LocalResource keytabResource = null;
-		if(remoteKeytabPath != null) {
+		if (remoteKeytabPath != null) {
 			log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
 			keytabResource = Records.newRecord(LocalResource.class);
 			Path keytabPath = new Path(remoteKeytabPath);
@@ -359,7 +355,7 @@ public final class Utils {
 		LocalResource yarnConfResource = null;
 		LocalResource krb5ConfResource = null;
 		boolean hasKrb5 = false;
-		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+		if (remoteYarnConfPath != null && remoteKrb5Path != null) {
 			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
 			yarnConfResource = Records.newRecord(LocalResource.class);
 			Path yarnConfPath = new Path(remoteYarnConfPath);
@@ -405,12 +401,12 @@ public final class Utils {
 		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
 
 		//To support Yarn Secure Integration Test Scenario
-		if(yarnConfResource != null && krb5ConfResource != null) {
+		if (yarnConfResource != null && krb5ConfResource != null) {
 			taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
 			taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
 		}
 
-		if(keytabResource != null) {
+		if (keytabResource != null) {
 			taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
 		}
 
@@ -450,7 +446,7 @@ public final class Utils {
 
 		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
 
-		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+		if (remoteKeytabPath != null && remoteKeytabPrincipal != null) {
 			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
 			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 64417f6..a424740 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -48,19 +44,17 @@ import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Option;
-import scala.Some;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.File;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -69,41 +63,43 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.Some;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.yarn.Utils.require;
 
 /**
  * This class is the executable entry point for the YARN application master.
  * It starts actor system and the actors for {@link JobManager}
  * and {@link YarnFlinkResourceManager}.
- * 
- * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
+ *
+ * <p>The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container
  * allocation and failure detection.
  */
 public class YarnApplicationMasterRunner {
 
-	/** Logger */
 	protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
 
 	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
-	 * before they quit */
+	 * before they quit. */
 	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	private static final Map<String, String> ENV = System.getenv();
 
-	/** The exit code returned if the initialization of the application master failed */
+	/** The exit code returned if the initialization of the application master failed. */
 	private static final int INIT_ERROR_EXIT_CODE = 31;
 
-	/** The exit code returned if the process exits because a critical actor died */
+	/** The exit code returned if the process exits because a critical actor died. */
 	private static final int ACTOR_DIED_EXIT_CODE = 32;
 
-
 	// ------------------------------------------------------------------------
 	//  Program entry point
 	// ------------------------------------------------------------------------
 
 	/**
-	 * The entry point for the YARN application master. 
+	 * The entry point for the YARN application master.
 	 *
 	 * @param args The command line arguments.
 	 */
@@ -144,7 +140,7 @@ public class YarnApplicationMasterRunner {
 			LOG.info("remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
 
 			String keytabPath = null;
-			if(remoteKeytabPath != null) {
+			if (remoteKeytabPath != null) {
 				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
 				keytabPath = f.getAbsolutePath();
 				LOG.debug("keytabPath: {}", keytabPath);
@@ -153,7 +149,7 @@ public class YarnApplicationMasterRunner {
 			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
 
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
-					currentUser.getShortUserName(), yarnClientUsername );
+					currentUser.getShortUserName(), yarnClientUsername);
 
 			// Flink configuration
 			final Map<String, String> dynamicProperties =
@@ -172,7 +168,7 @@ public class YarnApplicationMasterRunner {
 
 			//To support Yarn Secure Integration Test Scenario
 			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
-			if(krb5Conf.exists() && krb5Conf.canRead()) {
+			if (krb5Conf.exists() && krb5Conf.canRead()) {
 				String krb5Path = krb5Conf.getAbsolutePath();
 				LOG.info("KRB5 Conf: {}", krb5Path);
 				hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
@@ -181,7 +177,7 @@ public class YarnApplicationMasterRunner {
 			}
 
 			SecurityUtils.SecurityConfiguration sc;
-			if(hadoopConfiguration != null) {
+			if (hadoopConfiguration != null) {
 				sc = new SecurityUtils.SecurityConfiguration(flinkConfig, hadoopConfiguration);
 			} else {
 				sc = new SecurityUtils.SecurityConfiguration(flinkConfig);
@@ -298,7 +294,6 @@ public class YarnApplicationMasterRunner {
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
 
-
 			// ----------------- (2) start the actor system -------------------
 
 			// try to start the actor system, JobManager and JobManager actor system
@@ -314,7 +309,6 @@ public class YarnApplicationMasterRunner {
 
 			LOG.info("Actor system bound to hostname {}.", akkaHostname);
 
-
 			// ---- (3) Generate the configuration for the TaskManagers
 
 			final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
@@ -326,7 +320,6 @@ public class YarnApplicationMasterRunner {
 				taskManagerParameters, taskManagerConfig,
 				currDir, getTaskManagerClass(), LOG);
 
-
 			// ---- (4) start the actors and components in this order:
 
 			// 1) JobManager & Archive (in non-HA case, the leader service takes this)
@@ -360,7 +353,6 @@ public class YarnApplicationMasterRunner {
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
-
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
@@ -390,7 +382,7 @@ public class YarnApplicationMasterRunner {
 				webMonitorURL,
 				taskManagerParameters,
 				taskManagerContext,
-				numInitialTaskManagers, 
+				numInitialTaskManagers,
 				LOG);
 
 			ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
@@ -467,7 +459,6 @@ public class YarnApplicationMasterRunner {
 		return 0;
 	}
 
-
 	// ------------------------------------------------------------------------
 	//  For testing, this allows to override the actor classes used for
 	//  JobManager and the archive of completed jobs
@@ -494,10 +485,11 @@ public class YarnApplicationMasterRunner {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * 
-	 * @param baseDirectory
-	 * @param additional
-	 * 
+	 * Reads the global configuration from the given directory and adds the given parameters to it.
+	 *
+	 * @param baseDirectory directory to load the configuration from
+	 * @param additional additional parameters to be included in the configuration
+	 *
 	 * @return The configuration to be used by the TaskManagers.
 	 */
 	@SuppressWarnings("deprecation")
@@ -522,7 +514,7 @@ public class YarnApplicationMasterRunner {
 			configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
 		}
 
-		// if the user has set the deprecated YARN-specific config keys, we add the 
+		// if the user has set the deprecated YARN-specific config keys, we add the
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 7042f99..a435ef7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -15,14 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.yarn;
 
-import akka.actor.ActorRef;
+package org.apache.flink.yarn;
 
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.ClusterClient;
@@ -39,6 +34,12 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -47,10 +48,6 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.IOException;
@@ -59,6 +56,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * Java representation of a running Flink cluster within YARN.
  */
@@ -84,7 +86,7 @@ public class YarnClusterClient extends ClusterClient {
 
 	private boolean isConnected = true;
 
-	/** Indicator whether this cluster has just been created */
+	/** Indicator whether this cluster has just been created. */
 	private final boolean newlyCreatedCluster;
 
 	/**
@@ -128,7 +130,7 @@ public class YarnClusterClient extends ClusterClient {
 	}
 
 	/**
-	 * Disconnect from the Yarn cluster
+	 * Disconnect from the Yarn cluster.
 	 */
 	public void disconnect() {
 
@@ -136,7 +138,7 @@ public class YarnClusterClient extends ClusterClient {
 			return;
 		}
 
-		if(!isConnected) {
+		if (!isConnected) {
 			throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
 		}
 
@@ -151,7 +153,7 @@ public class YarnClusterClient extends ClusterClient {
 		try {
 			pollingRunner.stopRunner();
 			pollingRunner.join(1000);
-		} catch(InterruptedException e) {
+		} catch (InterruptedException e) {
 			LOG.warn("Shutdown of the polling runner was interrupted", e);
 			Thread.currentThread().interrupt();
 		}
@@ -159,7 +161,6 @@ public class YarnClusterClient extends ClusterClient {
 		isConnected = false;
 	}
 
-
 	// -------------------------- Interaction with the cluster ------------------------
 
 	/*
@@ -209,7 +210,7 @@ public class YarnClusterClient extends ClusterClient {
 	@Override
 	public String getWebInterfaceURL() {
 		// there seems to be a difference between HD 2.2.0 and 2.6.0
-		if(!trackingURL.startsWith("http://")) {
+		if (!trackingURL.startsWith("http://")) {
 			return "http://" + trackingURL;
 		} else {
 			return trackingURL;
@@ -226,10 +227,10 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	@Override
 	public GetClusterStatusResponse getClusterStatus() {
-		if(!isConnected) {
+		if (!isConnected) {
 			throw new IllegalStateException("The cluster is not connected to the cluster.");
 		}
-		if(hasBeenShutdown()) {
+		if (hasBeenShutdown()) {
 			throw new IllegalStateException("The cluster has already been shutdown.");
 		}
 
@@ -245,17 +246,17 @@ public class YarnClusterClient extends ClusterClient {
 	}
 
 	public ApplicationStatus getApplicationStatus() {
-		if(!isConnected) {
+		if (!isConnected) {
 			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
 		}
 		ApplicationReport lastReport = null;
-		if(pollingRunner == null) {
+		if (pollingRunner == null) {
 			LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized cluster." +
 					"The system might be in an erroneous state");
 		} else {
 			lastReport = pollingRunner.getLastReport();
 		}
-		if(lastReport == null) {
+		if (lastReport == null) {
 			LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't receive a status so far." +
 					"The system might be in an erroneous state");
 			return ApplicationStatus.UNKNOWN;
@@ -264,7 +265,7 @@ public class YarnClusterClient extends ClusterClient {
 			ApplicationStatus status =
 				(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED) ?
 					ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
-			if(status != ApplicationStatus.SUCCEEDED) {
+			if (status != ApplicationStatus.SUCCEEDED) {
 				LOG.warn("YARN reported application state {}", appState);
 				LOG.warn("Diagnostics: {}", lastReport.getDiagnostics());
 			}
@@ -275,17 +276,17 @@ public class YarnClusterClient extends ClusterClient {
 	@Override
 	public List<String> getNewMessages() {
 
-		if(hasBeenShutdown()) {
+		if (hasBeenShutdown()) {
 			throw new RuntimeException("The YarnClusterClient has already been stopped");
 		}
 
-		if(!isConnected) {
+		if (!isConnected) {
 			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
 		}
 
 		List<String> ret = new ArrayList<String>();
 		// get messages from ApplicationClient (locally)
-		while(true) {
+		while (true) {
 			Object result;
 			try {
 				Future<Object> response =
@@ -294,23 +295,23 @@ public class YarnClusterClient extends ClusterClient {
 						YarnMessages.getLocalGetYarnMessage(),
 						new Timeout(akkaDuration));
 				result = Await.result(response, akkaDuration);
-			} catch(Exception ioe) {
+			} catch (Exception ioe) {
 				LOG.warn("Error retrieving the YARN messages locally", ioe);
 				break;
 			}
 
-			if(!(result instanceof Option)) {
+			if (!(result instanceof Option)) {
 				throw new RuntimeException("LocalGetYarnMessage requires a response of type " +
 						"Option. Instead the response is of type " + result.getClass() + ".");
 			} else {
 				Option messageOption = (Option) result;
 				LOG.debug("Received message option {}", messageOption);
-				if(messageOption.isEmpty()) {
+				if (messageOption.isEmpty()) {
 					break;
 				} else {
 					Object obj = messageOption.get();
 
-					if(obj instanceof InfoMessage) {
+					if (obj instanceof InfoMessage) {
 						InfoMessage msg = (InfoMessage) obj;
 						ret.add("[" + msg.date() + "] " + msg.message());
 					} else {
@@ -339,7 +340,7 @@ public class YarnClusterClient extends ClusterClient {
 	}
 
 	/**
-	 * Shuts down the Yarn application
+	 * Shuts down the Yarn application.
 	 */
 	public void shutdownCluster() {
 
@@ -365,7 +366,7 @@ public class YarnClusterClient extends ClusterClient {
 							"Flink YARN Client requested shutdown"),
 					new Timeout(akkaDuration));
 			Await.ready(response, akkaDuration);
-		} catch(Exception e) {
+		} catch (Exception e) {
 			LOG.warn("Error while stopping YARN cluster.", e);
 		}
 
@@ -385,7 +386,7 @@ public class YarnClusterClient extends ClusterClient {
 		try {
 			pollingRunner.stopRunner();
 			pollingRunner.join(1000);
-		} catch(InterruptedException e) {
+		} catch (InterruptedException e) {
 			LOG.warn("Shutdown of the polling runner was interrupted", e);
 			Thread.currentThread().interrupt();
 		}
@@ -420,7 +421,6 @@ public class YarnClusterClient extends ClusterClient {
 		return hasBeenShutDown.get();
 	}
 
-
 	private class ClientShutdownHook extends Thread {
 		@Override
 		public void run() {
@@ -446,14 +446,13 @@ public class YarnClusterClient extends ClusterClient {
 		private final Object lock = new Object();
 		private ApplicationReport lastReport;
 
-
 		public PollingThread(YarnClient yarnClient, ApplicationId appId) {
 			this.yarnClient = yarnClient;
 			this.appId = appId;
 		}
 
 		public void stopRunner() {
-			if(!running.get()) {
+			if (!running.get()) {
 				LOG.warn("Polling thread was already stopped");
 			}
 			running.set(false);
@@ -484,7 +483,7 @@ public class YarnClusterClient extends ClusterClient {
 					stopRunner();
 				}
 			}
-			if(running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
+			if (running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
 				// == if the polling thread is still running but the yarn client is stopped.
 				LOG.warn("YARN client is unexpected in state " + yarnClient.getServiceState());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index 33d5987..f58e6aa 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.yarn;
 
 import org.apache.flink.api.common.JobSubmissionResult;
@@ -23,6 +24,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -37,7 +39,7 @@ import java.util.List;
 
 /**
  * Java representation of a running Flink job on YARN.
- * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster, 
+ * Since flip-6, a flink job will be run as a yarn job by default, each job has a jobmaster,
  * so this class will be used as a client to communicate with yarn and start the job on yarn.
  */
 public class YarnClusterClientV2 extends ClusterClient {
@@ -95,7 +97,7 @@ public class YarnClusterClientV2 extends ClusterClient {
 			if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
 				appId = report.getApplicationId();
 				trackingURL = report.getTrackingUrl();
-				logAndSysout("Please refer to " + getWebInterfaceURL() 
+				logAndSysout("Please refer to " + getWebInterfaceURL()
 						+ " for the running status of job " +  jobGraph.getJobID().toString());
 				//TODO: not support attach mode now
 				return new JobSubmissionResult(jobGraph.getJobID());
@@ -112,7 +114,7 @@ public class YarnClusterClientV2 extends ClusterClient {
 	@Override
 	public String getWebInterfaceURL() {
 		// there seems to be a difference between HD 2.2.0 and 2.6.0
-		if(!trackingURL.startsWith("http://")) {
+		if (!trackingURL.startsWith("http://")) {
 			return "http://" + trackingURL;
 		} else {
 			return trackingURL;

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 5f745b2..db5206a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.yarn;
 
+package org.apache.flink.yarn;
 
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index e3bd944..b22b163 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -15,13 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.yarn;
 
+package org.apache.flink.yarn;
 
 /**
  * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the new application master for a job under flip-6.
  * This implementation is now however tricky, since YarnClusterDescriptorV2 is related YarnClusterClientV2, but AbstractYarnClusterDescriptor is related
- * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor<YarnClusterClientV2>.
+ * to YarnClusterClient. We should let YarnClusterDescriptorV2 implements ClusterDescriptor&lt;YarnClusterClientV2&gt;.
  * However, in order to use the code in AbstractYarnClusterDescriptor for setting environments and so on, we make YarnClusterDescriptorV2 as now.
  */
 public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index 7c9c7a7..03d94fe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -27,9 +27,9 @@ public class YarnConfigKeys {
 	//  Environment variable names
 	// ------------------------------------------------------------------------
 
-	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public final static String ENV_APP_ID = "_APP_ID";
+	public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
+	public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
+	public static final String ENV_APP_ID = "_APP_ID";
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
 	public static final String ENV_SLOTS = "_SLOTS";
@@ -38,12 +38,12 @@ public class YarnConfigKeys {
 
 	public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
 
-	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
-	public final static String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
+	public static final String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+	public static final String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
 
-	public final static String KEYTAB_PATH = "_KEYTAB_PATH";
-	public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
-	public final static String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
+	public static final String KEYTAB_PATH = "_KEYTAB_PATH";
+	public static final String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+	public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
 	public static final String ENV_ZOOKEEPER_NAMESPACE = "_ZOOKEEPER_NAMESPACE";
 
 	public static final String ENV_KRB5_PATH = "_KRB5_PATH";
@@ -51,7 +51,7 @@ public class YarnConfigKeys {
 
 	// ------------------------------------------------------------------------
 
-	/** Private constructor to prevent instantiation */
+	/** Private constructor to prevent instantiation. */
 	private YarnConfigKeys() {}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
index 370df26..9a98519 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
 import org.apache.hadoop.yarn.api.records.Container;
 
 import static java.util.Objects.requireNonNull;
@@ -34,7 +35,7 @@ public class YarnContainerInLaunch implements ResourceIDRetrievable {
 
 	private final long timestamp;
 
-	/** The resource id associated with this worker type */
+	/** The resource id associated with this worker type. */
 	private final ResourceID resourceID;
 
 	public YarnContainerInLaunch(Container container) {

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
index 3f4d4f6..2ad9065 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorSystem;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
@@ -46,9 +45,10 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
+
+import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -57,32 +57,33 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * This class is the executable entry point for the YARN Application Master that
  * executes a single Flink job and then shuts the YARN application down.
- * 
+ *
  * <p>The lifetime of the YARN application bound to that of the Flink job. Other
  * YARN Application Master implementations are for example the YARN session.
- * 
- * It starts actor system and the actors for {@link JobManagerRunner}
+ *
+ * <p>It starts actor system and the actors for {@link JobManagerRunner}
  * and {@link YarnResourceManager}.
  *
- * The JobManagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
+ * <p>The JobManagerRunner start a {@link org.apache.flink.runtime.jobmaster.JobMaster}
  * JobMaster handles Flink job execution, while the YarnResourceManager handles container
  * allocation and failure detection.
  */
 public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicationMasterRunner
 		implements OnCompletionActions, FatalErrorHandler {
 
-	/** Logger */
 	protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
 
-	/** The job graph file path */
+	/** The job graph file path. */
 	private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
 
 	// ------------------------------------------------------------------------
 
-	/** The lock to guard startup / shutdown / manipulation methods */
+	/** The lock to guard startup / shutdown / manipulation methods. */
 	private final Object lock = new Object();
 
 	@GuardedBy("lock")
@@ -144,7 +145,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 					HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
 				heartbeatServices = HeartbeatServices.fromConfiguration(config);
-				
+
 				metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 				// ---- (2) init resource manager -------
@@ -310,7 +311,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	//----------------------------------------------------------------------------------------------
 
 	/**
-	 * Job completion notification triggered by JobManager
+	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
 	public void jobFinished(JobExecutionResult result) {
@@ -318,7 +319,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	}
 
 	/**
-	 * Job completion notification triggered by JobManager
+	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
 	public void jobFailed(Throwable cause) {
@@ -326,7 +327,7 @@ public class YarnFlinkApplicationMasterRunner extends AbstractYarnFlinkApplicati
 	}
 
 	/**
-	 * Job completion notification triggered by self
+	 * Job completion notification triggered by self.
 	 */
 	@Override
 	public void jobFinishedByOther() {

http://git-wip-us.apache.org/repos/asf/flink/blob/77b0fb9f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 3c85795..4626a7e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.yarn;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -34,6 +31,8 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.messages.ContainersAllocated;
 import org.apache.flink.yarn.messages.ContainersComplete;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -46,7 +45,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
 import org.slf4j.Logger;
 
 import java.lang.reflect.Method;
@@ -66,57 +64,57 @@ import static java.util.Objects.requireNonNull;
  */
 public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYarnWorkerNode> {
 
-	/** The heartbeat interval while the resource master is waiting for containers */
+	/** The heartbeat interval while the resource master is waiting for containers. */
 	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
 
-	/** The default heartbeat interval during regular operation */
+	/** The default heartbeat interval during regular operation. */
 	private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
 
 	/** Environment variable name of the final container id used by the Flink ResourceManager.
 	 * Container ID generation may vary across Hadoop versions. */
-	final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+	static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 
-	/** The containers where a TaskManager is starting and we are waiting for it to register */
+	/** The containers where a TaskManager is starting and we are waiting for it to register. */
 	private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
 
 	/** Containers we have released, where we are waiting for an acknowledgement that
-	 * they are released */
+	 * they are released. */
 	private final Map<ContainerId, Container> containersBeingReturned;
 
-	/** The YARN / Hadoop configuration object */
+	/** The YARN / Hadoop configuration object. */
 	private final YarnConfiguration yarnConfig;
 
-	/** The TaskManager container parameters (like container memory size) */
+	/** The TaskManager container parameters (like container memory size). */
 	private final ContaineredTaskManagerParameters taskManagerParameters;
 
-	/** Context information used to start a TaskManager Java process */
+	/** Context information used to start a TaskManager Java process. */
 	private final ContainerLaunchContext taskManagerLaunchContext;
 
-	/** Host name for the container running this process */
+	/** Host name for the container running this process. */
 	private final String applicationMasterHostName;
 
-	/** Web interface URL, may be null */
+	/** Web interface URL, may be null. */
 	private final String webInterfaceURL;
 
-	/** Default heartbeat interval between this actor and the YARN ResourceManager */
+	/** Default heartbeat interval between this actor and the YARN ResourceManager. */
 	private final int yarnHeartbeatIntervalMillis;
 
-	/** Number of failed TaskManager containers before stopping the application. -1 means infinite. */ 
+	/** Number of failed TaskManager containers before stopping the application. -1 means infinite. */
 	private final int maxFailedContainers;
 
-	/** Callback handler for the asynchronous resourceManagerClient */
+	/** Callback handler for the asynchronous resourceManagerClient. */
 	private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
 
-	/** Client to communicate with the Resource Manager (YARN's master) */
+	/** Client to communicate with the Resource Manager (YARN's master). */
 	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
 
-	/** Client to communicate with the Node manager and launch TaskManager processes */
+	/** Client to communicate with the Node manager and launch TaskManager processes. */
 	private NMClient nodeManagerClient;
 
-	/** The number of containers requested, but not yet granted */
+	/** The number of containers requested, but not yet granted. */
 	private int numPendingContainerRequests;
 
-	/** The number of failed containers since the master became active */
+	/** The number of failed containers since the master became active. */
 	private int failedContainersSoFar;
 
 	/** A reference to the reflector to look up previous session containers. */
@@ -428,7 +426,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	}
 
 	// ------------------------------------------------------------------------
-	//  Callbacks from the YARN Resource Manager 
+	//  Callbacks from the YARN Resource Manager
 	// ------------------------------------------------------------------------
 
 	private void containersAllocated(List<Container> containers) {
@@ -491,7 +489,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	/**
 	 * Invoked when the ResourceManager informs of completed containers.
 	 * Called via an actor message by the callback from the ResourceManager client.
-	 * 
+	 *
 	 * @param containers The containers that have completed.
 	 */
 	private void containersComplete(List<ContainerStatus> containers) {
@@ -624,8 +622,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 		private Logger logger;
 		private Method method;
 
-		public RegisterApplicationMasterResponseReflector(Logger LOG) {
-			this.logger = LOG;
+		public RegisterApplicationMasterResponseReflector(Logger log) {
+			this.logger = log;
 
 			try {
 				method = RegisterApplicationMasterResponse.class
@@ -671,12 +669,12 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 
 	/**
 	 * Creates the props needed to instantiate this actor.
-	 * 
-	 * Rather than extracting and validating parameters in the constructor, this factory method takes
+	 *
+	 * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
 	 * care of that. That way, errors occur synchronously, and are not swallowed simply in a
 	 * failed asynchronous attempt to start the actor.
-	 
-	 * @param actorClass 
+	 *
+	 * @param actorClass
 	 *             The actor class, to allow overriding this actor with subclasses for testing.
 	 * @param flinkConfig
 	 *             The Flink configuration object.
@@ -694,7 +692,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 	 *             The initial number of TaskManagers to allocate.
 	 * @param log
 	 *             The logger to log to.
-	 * 
+	 *
 	 * @return The Props object to instantiate the YarnFlinkResourceManager actor.
 	 */
 	public static Props createActorProps(Class<? extends YarnFlinkResourceManager> actorClass,
@@ -706,8 +704,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			ContaineredTaskManagerParameters taskManagerParameters,
 			ContainerLaunchContext taskManagerLaunchContext,
 			int numInitialTaskManagers,
-			Logger log)
-	{
+			Logger log) {
+
 		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
 			ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;