You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 18:49:43 UTC

[2/5] flink git commit: [FLINK-6715] Activate strict checkstyle for flink-mesos

[FLINK-6715] Activate strict checkstyle for flink-mesos

This closes #3988.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bca76ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bca76ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bca76ed

Branch: refs/heads/master
Commit: 0bca76ede8b7447014a7d7ed17633d77ecfafe18
Parents: 0e69dd5
Author: zentol <ch...@apache.org>
Authored: Thu May 25 10:25:24 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:56:26 2017 +0200

----------------------------------------------------------------------
 flink-mesos/pom.xml                             |  35 +++++
 .../main/java/org/apache/flink/mesos/Utils.java |   9 +-
 .../flink/mesos/cli/FlinkMesosSessionCli.java   |  15 +-
 .../clusterframework/LaunchableMesosWorker.java |  35 ++---
 .../MesosApplicationMasterRunner.java           |  65 +++++----
 .../clusterframework/MesosConfigKeys.java       |  17 +--
 .../MesosFlinkResourceManager.java              | 101 +++++++-------
 .../MesosTaskManagerParameters.java             |  71 +++++-----
 .../MesosTaskManagerRunner.java                 |  19 ++-
 .../services/MesosServicesUtils.java            |   3 +
 .../services/StandaloneMesosServices.java       |   3 +-
 .../store/MesosWorkerStore.java                 |  13 +-
 .../store/StandaloneMesosWorkerStore.java       |   3 +-
 .../store/ZooKeeperMesosWorkerStore.java        |  16 ++-
 .../flink/mesos/scheduler/SchedulerProxy.java   |  10 +-
 .../mesos/scheduler/TaskSchedulerBuilder.java   |   2 +-
 .../scheduler/messages/ResourceOffers.java      |   1 +
 .../flink/mesos/util/MesosArtifactResolver.java |   3 +-
 .../flink/mesos/util/MesosArtifactServer.java   |  50 ++++---
 .../flink/mesos/util/MesosConfiguration.java    |  29 ++--
 .../apache/flink/mesos/util/ZooKeeperUtils.java |   6 +-
 .../MesosFlinkResourceManagerTest.java          | 122 +++++++++-------
 .../MesosTaskManagerParametersTest.java         | 139 ++++++++++---------
 23 files changed, 427 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 4dae731..94187ee 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -300,6 +300,41 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 7787e40..308e093 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -20,12 +20,17 @@ package org.apache.flink.mesos;
 
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.net.URL;
 import java.util.Arrays;
 
+import scala.Option;
+
+/**
+ * Collection of utility methods.
+ */
 public class Utils {
 	/**
 	 * Construct a Mesos environment variable.
@@ -53,7 +58,7 @@ public class Utils {
 	 */
 	public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
 		Option<URL> url = resolver.resolve(artifact.dest);
-		if(url.isEmpty()) {
+		if (url.isEmpty()) {
 			throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
index dcce0b8..f6850d2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.mesos.cli;
 
+import org.apache.flink.configuration.Configuration;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.configuration.Configuration;
 
 import java.io.IOException;
 import java.util.Map;
@@ -35,28 +36,30 @@ public class FlinkMesosSessionCli {
 
 	/**
 	 * Decode encoded dynamic properties.
+	 *
 	 * @param dynamicPropertiesEncoded encoded properties produced by the encoding method.
 	 * @return a configuration instance to be merged with the static configuration.
 	 */
 	public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) {
 		try {
 			Configuration configuration = new Configuration();
-			if(dynamicPropertiesEncoded != null) {
-				TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {};
-				Map<String,String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
+			if (dynamicPropertiesEncoded != null) {
+				TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {
+				};
+				Map<String, String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
 				for (Map.Entry<String, String> property : props.entrySet()) {
 					configuration.setString(property.getKey(), property.getValue());
 				}
 			}
 			return configuration;
-		}
-		catch(IOException ex) {
+		} catch (IOException ex) {
 			throw new IllegalArgumentException("unreadable encoded properties", ex);
 		}
 	}
 
 	/**
 	 * Encode dynamic properties as a string to be transported as an environment variable.
+	 *
 	 * @param configuration the dynamic properties to encode.
 	 * @return a string to be decoded later.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 04a406f..ce7bb9d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -18,23 +18,23 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.TaskAssignmentResult;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.util.Collections;
 import java.util.List;
@@ -42,15 +42,17 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 
-import static org.apache.flink.mesos.Utils.variable;
+import scala.Option;
+
 import static org.apache.flink.mesos.Utils.range;
 import static org.apache.flink.mesos.Utils.ranges;
 import static org.apache.flink.mesos.Utils.scalar;
+import static org.apache.flink.mesos.Utils.variable;
 
 /**
  * Implements the launch of a Mesos worker.
  *
- * Translates the abstract {@link ContainerSpecification} into a concrete
+ * <p>Translates the abstract {@link ContainerSpecification} into a concrete
  * Mesos-specific {@link Protos.TaskInfo}.
  */
 public class LaunchableMesosWorker implements LaunchableTask {
@@ -59,9 +61,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	/**
 	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
 	 */
-	private static String[] TM_PORT_KEYS = {
+	private static final String[] TM_PORT_KEYS = {
 		"taskmanager.rpc.port",
-		"taskmanager.data.port" };
+		"taskmanager.data.port"};
 
 	private final MesosArtifactResolver resolver;
 	private final ContainerSpecification containerSpec;
@@ -88,7 +90,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		this.params = Preconditions.checkNotNull(params);
 		this.taskID = Preconditions.checkNotNull(taskID);
 		this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
-		
+
 		this.taskRequest = new Request();
 	}
 
@@ -204,7 +206,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		//configure task manager hostname property if hostname override property is supplied
 		Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
 
-		if(taskManagerHostnameOption.isDefined()) {
+		if (taskManagerHostnameOption.isDefined()) {
 			// replace the TASK_ID pattern by the actual task id value of the Mesos task
 			final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
 				.matcher(taskManagerHostnameOption.get())
@@ -225,7 +227,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		}
 
 		// ship additional files
-		for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+		for (ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
 			cmd.addUris(Utils.uri(resolver, artifact));
 		}
 
@@ -271,9 +273,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		// in event that no docker image or mesos image name is specified, we must still
 		// set type to MESOS
 		containerInfo.setType(Protos.ContainerInfo.Type.MESOS);
-		switch(params.containerType()) {
+		switch (params.containerType()) {
 			case MESOS:
-				if(params.containerImageName().isDefined()) {
+				if (params.containerImageName().isDefined()) {
 					containerInfo
 						.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
 							.setImage(Protos.Image.newBuilder()
@@ -285,7 +287,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 			case DOCKER:
 				assert(params.containerImageName().isDefined());
-					containerInfo
+				containerInfo
 					.setType(Protos.ContainerInfo.Type.DOCKER)
 					.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
 						.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
@@ -300,7 +302,6 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		containerInfo.addAllVolumes(params.containerVolumes());
 		taskInfo.setContainer(containerInfo);
 
-
 		return taskInfo.build();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 09ef380..fc75bd7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Props;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -62,15 +52,19 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
-import org.apache.mesos.Protos;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.Option;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
@@ -82,6 +76,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -89,24 +87,26 @@ import static org.apache.flink.util.Preconditions.checkState;
  * It starts actor system and the actors for {@link JobManager}
  * and {@link MesosFlinkResourceManager}.
  *
- * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
+ * <p>The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
  * allocation and failure detection.
  */
 public class MesosApplicationMasterRunner {
-	/** Logger */
+
 	protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
 
-	/** The maximum time that TaskManagers may be waiting to register at the JobManager,
-	 * before they quit */
+	/**
+	 * The maximum time that TaskManagers may be waiting to register at the JobManager,
+	 * before they quit.
+	 */
 	private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	private static final Map<String, String> ENV = System.getenv();
 
-	/** The exit code returned if the initialization of the application master failed */
+	/** The exit code returned if the initialization of the application master failed. */
 	private static final int INIT_ERROR_EXIT_CODE = 31;
 
-	/** The exit code returned if the process exits because a critical actor died */
+	/** The exit code returned if the process exits because a critical actor died. */
 	private static final int ACTOR_DIED_EXIT_CODE = 32;
 
 	// ------------------------------------------------------------------------
@@ -142,7 +142,7 @@ public class MesosApplicationMasterRunner {
 
 	/**
 	 * The instance entry point for the Mesos AppMaster. Obtains user group
-	 * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
+	 * information and calls the main work method {@link #runPrivileged(Configuration, Configuration)} as a
 	 * privileged action.
 	 *
 	 * @param args The command line arguments.
@@ -318,7 +318,6 @@ public class MesosApplicationMasterRunner {
 				getJobManagerClass(),
 				getArchivistClass())._1();
 
-
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
@@ -328,7 +327,7 @@ public class MesosApplicationMasterRunner {
 				actorSystem,
 				jobManager,
 				LOG);
-			if(webMonitor != null) {
+			if (webMonitor != null) {
 				final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
 				mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
 			}
@@ -380,7 +379,7 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if(artifactServer != null) {
+			if (artifactServer != null) {
 				try {
 					artifactServer.stop();
 				} catch (Throwable ignored) {
@@ -396,7 +395,7 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if(futureExecutor != null) {
+			if (futureExecutor != null) {
 				try {
 					futureExecutor.shutdownNow();
 				} catch (Throwable tt) {
@@ -404,7 +403,7 @@ public class MesosApplicationMasterRunner {
 				}
 			}
 
-			if(ioExecutor != null) {
+			if (ioExecutor != null) {
 				try {
 					ioExecutor.shutdownNow();
 				} catch (Throwable tt) {
@@ -493,7 +492,7 @@ public class MesosApplicationMasterRunner {
 			.setHostname(hostname);
 		Protos.Credential.Builder credential = null;
 
-		if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
+		if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
 			throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
 		}
 		String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
@@ -517,7 +516,7 @@ public class MesosApplicationMasterRunner {
 			ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
 			ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
 
-		if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+		if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
 			frameworkInfo.setPrincipal(flinkConfig.getString(
 				ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
 
@@ -526,7 +525,7 @@ public class MesosApplicationMasterRunner {
 
 			// some environments use a side-channel to communicate the secret to Mesos,
 			// and thus don't set the 'secret' configuration setting
-			if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+			if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
 				credential.setSecret(flinkConfig.getString(
 					ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
 			}
@@ -545,7 +544,7 @@ public class MesosApplicationMasterRunner {
 	 * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
 	 * The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
 	 * A lightweight HTTP server serves the artifacts to the fetcher.
-     */
+	 */
 	private static void applyOverlays(
 		Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
 
@@ -565,7 +564,7 @@ public class MesosApplicationMasterRunner {
 
 	private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
 		// serve the artifacts associated with the container environment
-		for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+		for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
 			server.addPath(artifact.source, artifact.dest);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 35da95f..1af94e2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -27,30 +27,31 @@ public class MesosConfigKeys {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * The Mesos task ID, used by the TM for informational purposes
+	 * The Mesos task ID, used by the TM for informational purposes.
 	 */
 	public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
 
 	/**
-	 * Reserved for future enhancement
+	 * Reserved for future enhancement.
 	 */
 	public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
 
 	/**
-	 * JVM arguments, used by the JM and TM
+	 * JVM arguments, used by the JM and TM.
 	 */
 	public static final String ENV_JVM_ARGS = "JVM_ARGS";
 
 	/**
-	 * Standard environment variables used in DCOS environment
+	 * Standard environment variables used in DCOS environment.
 	 */
 	public static final String ENV_TASK_NAME = "TASK_NAME";
 
 	/**
- 	 * Standard environment variables used in DCOS environment
- 	 */
+	 * Standard environment variables used in DCOS environment.
+	 */
 	public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
 
-	/** Private constructor to prevent instantiation */
-	private MesosConfigKeys() {}
+	/** Private constructor to prevent instantiation. */
+	private MesosConfigKeys() {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 17ffef7..6c708fa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -18,20 +18,14 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
 import org.apache.flink.mesos.scheduler.SchedulerProxy;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
@@ -48,17 +42,23 @@ import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Protos.FrameworkInfo;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -66,6 +66,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -73,25 +75,25 @@ import static java.util.Objects.requireNonNull;
  */
 public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
 
-	/** The Mesos configuration (master and framework info) */
+	/** The Mesos configuration (master and framework info). */
 	private final MesosConfiguration mesosConfig;
 
-	/** The TaskManager container parameters (like container memory size) */
+	/** The TaskManager container parameters (like container memory size). */
 	private final MesosTaskManagerParameters taskManagerParameters;
 
-	/** Container specification for launching a TM */
+	/** Container specification for launching a TM. */
 	private final ContainerSpecification taskManagerContainerSpec;
 
-	/** Resolver for HTTP artifacts **/
+	/** Resolver for HTTP artifacts. **/
 	private final MesosArtifactResolver artifactResolver;
 
 	/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
 	private final int maxFailedTasks;
 
-	/** Callback handler for the asynchronous Mesos scheduler */
+	/** Callback handler for the asynchronous Mesos scheduler. */
 	private SchedulerProxy schedulerCallbackHandler;
 
-	/** Mesos scheduler driver */
+	/** Mesos scheduler driver. */
 	private SchedulerDriver schedulerDriver;
 
 	private ActorRef connectionMonitor;
@@ -104,12 +106,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 	private final MesosWorkerStore workerStore;
 
-	/** planning state related to workers - package private for unit test purposes */
+	/** planning state related to workers - package private for unit test purposes. */
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
 	final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
 
-	/** The number of failed tasks since the master became active */
+	/** The number of failed tasks since the master became active. */
 	private int failedTasksSoFar;
 
 	public MesosFlinkResourceManager(
@@ -158,7 +160,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			.setCheckpoint(true);
 
 		Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
-		if(frameworkID.isEmpty()) {
+		if (frameworkID.isEmpty()) {
 			LOG.info("Registering as new framework.");
 		}
 		else {
@@ -248,7 +250,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
 			taskTerminated(msg.taskID(), msg.status());
 
-		} else  {
+		} else {
 			// message handled by the generic resource master code
 			super.handleMessage(message);
 		}
@@ -267,15 +269,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		try {
 			// unregister the framework, which implicitly removes all tasks.
 			schedulerDriver.stop(false);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			LOG.warn("unable to unregister the framework", ex);
 		}
 
 		try {
 			workerStore.stop(true);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			LOG.warn("unable to stop the worker state store", ex);
 		}
 
@@ -308,13 +308,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		if (!tasksFromPreviousAttempts.isEmpty()) {
 			LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
 
-			List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+			List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
 			List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
 
 			for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
 				LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
 
-				switch(worker.state()) {
+				switch (worker.state()) {
 					case New:
 						workersInNew.put(extractResourceID(worker.taskID()), worker);
 						toLaunch.add(launchable);
@@ -331,11 +331,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			}
 
 			// tell the launch coordinator about prior assignments
-			if(toAssign.size() >= 1) {
+			if (toAssign.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
 			}
 			// tell the launch coordinator to launch any new tasks
-			if(toLaunch.size() >= 1) {
+			if (toLaunch.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
 			}
 		}
@@ -374,11 +374,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			}
 
 			// tell the launch coordinator to launch the new tasks
-			if(toLaunch.size() >= 1) {
+			if (toLaunch.size() >= 1) {
 				launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
 			}
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to request new workers", ex);
 		}
 	}
@@ -386,7 +385,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	/**
 	 * Accept offers as advised by the launch coordinator.
 	 *
-	 * Acceptance is routed through the RM to update the persistent state before
+	 * <p>Acceptance is routed through the RM to update the persistent state before
 	 * forwarding the message to Mesos.
 	 */
 	private void acceptOffers(AcceptOffers msg) {
@@ -421,15 +420,14 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 			// send the acceptance message to Mesos
 			schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to accept offers", ex);
 		}
 	}
 
 	/**
 	 * Handle a task status change.
-     */
+	 */
 	private void taskStatusUpdated(StatusUpdate message) {
 		taskRouter.tell(message, self());
 		reconciliationCoordinator.tell(message, self());
@@ -470,9 +468,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			if (worker != null) {
 				LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
 				accepted.add(new RegisteredMesosWorkerNode(worker));
-			}
-			else {
-				if(isStarted(resourceID)) {
+			} else {
+				if (isStarted(resourceID)) {
 					LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
 				}
 				else {
@@ -549,8 +546,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 		try {
 			workerStore.setFrameworkID(Option.apply(message.frameworkId()));
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to store the assigned framework ID", ex);
 			return;
 		}
@@ -598,13 +594,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 		boolean existed;
 		try {
 			existed = workerStore.removeWorker(taskID);
-		}
-		catch(Exception ex) {
+		} catch (Exception ex) {
 			fatalError("unable to remove worker", ex);
 			return;
 		}
 
-		if(!existed) {
+		if (!existed) {
 			LOG.info("Received a termination notice for an unrecognized worker: {}", id);
 			return;
 		}
@@ -695,11 +690,15 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 * @return goal state information for the {@Link TaskMonitor}.
 	 */
 	static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
-		switch(worker.state()) {
-			case New: return new TaskMonitor.New(worker.taskID());
-			case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
-			case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
-			default: throw new IllegalArgumentException("unsupported worker state");
+		switch (worker.state()) {
+			case New:
+				return new TaskMonitor.New(worker.taskID());
+			case Launched:
+				return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+			case Released:
+				return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+			default:
+				throw new IllegalArgumentException("unsupported worker state");
 		}
 	}
 
@@ -727,7 +726,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	/**
 	 * Creates the props needed to instantiate this actor.
 	 *
-	 * Rather than extracting and validating parameters in the constructor, this factory method takes
+	 * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
 	 * care of that. That way, errors occur synchronously, and are not swallowed simply in a
 	 * failed asynchronous attempt to start the actor.
 
@@ -746,7 +745,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 	 *
 	 * @return The Props object to instantiate the MesosFlinkResourceManager actor.
 	 */
-	public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass,
+	public static Props createActorProps(
+			Class<? extends MesosFlinkResourceManager> actorClass,
 			Configuration flinkConfig,
 			MesosConfiguration mesosConfig,
 			MesosWorkerStore workerStore,
@@ -754,8 +754,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 			MesosTaskManagerParameters taskManagerParameters,
 			ContainerSpecification taskManagerContainerSpec,
 			MesosArtifactResolver artifactResolver,
-			Logger log)
-	{
+			Logger log) {
 
 		final int numInitialTaskManagers = flinkConfig.getInteger(
 			ConfigConstants.MESOS_INITIAL_TASKS, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4324469..f5a415e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -18,34 +18,36 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.functions.Func1;
-import com.netflix.fenzo.plugins.HostAttrValueConstraint;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.functions.Func1;
+import com.netflix.fenzo.plugins.HostAttrValueConstraint;
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import scala.Option;
+
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
  * This class describes the Mesos-specific parameters for launching a TaskManager process.
  *
- * These parameters are in addition to the common parameters
+ * <p>These parameters are in addition to the common parameters
  * provided by {@link ContaineredTaskManagerParameters}.
  */
 public class MesosTaskManagerParameters {
 
-	/** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
+	/** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task. */
 	public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
 
 	public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
@@ -75,11 +77,11 @@ public class MesosTaskManagerParameters {
 	public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
 		key("mesos.resourcemanager.tasks.bootstrap-cmd")
 		.noDefaultValue();
-	
+
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
 		key("mesos.resourcemanager.tasks.container.volumes")
 		.noDefaultValue();
-	
+
 	public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
 		key("mesos.constraints.hard.hostattribute")
 		.noDefaultValue();
@@ -102,9 +104,9 @@ public class MesosTaskManagerParameters {
 	private final ContaineredTaskManagerParameters containeredParameters;
 
 	private final List<Protos.Volume> containerVolumes;
-	
+
 	private final List<ConstraintEvaluator> constraints;
-	
+
 	private final Option<String> bootstrapCommand;
 
 	private final Option<String> taskManagerHostname;
@@ -129,10 +131,9 @@ public class MesosTaskManagerParameters {
 		this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
 	}
 
-
-    /**
+	/**
 	 * Get the CPU units to use for the TaskManager process.
-     */
+	 */
 	public double cpus() {
 		return cpus;
 	}
@@ -140,49 +141,53 @@ public class MesosTaskManagerParameters {
 	/**
 	 * Get the container type (Mesos or Docker).  The default is Mesos.
 	 *
-	 * Mesos provides a facility for a framework to specify which containerizer to use.
-     */
+	 * <p>Mesos provides a facility for a framework to specify which containerizer to use.
+	 */
 	public ContainerType containerType() {
 		return containerType;
 	}
 
 	/**
 	 * Get the container image name.
-     */
+	 */
 	public Option<String> containerImageName() {
 		return containerImageName;
 	}
 
 	/**
 	 * Get the common containered parameters.
-     */
+	 */
 	public ContaineredTaskManagerParameters containeredParameters() {
 		return containeredParameters;
 	}
 
 	/**
-	 * Get the container volumes string
+	 * Get the container volumes string.
 	 */
 	public List<Protos.Volume> containerVolumes() {
 		return containerVolumes;
 	}
 
 	/**
-	 * Get the placement constraints
+	 * Get the placement constraints.
 	 */
 	public List<ConstraintEvaluator> constraints() {
 		return constraints;
 	}
 
 	/**
- 	 * Get the taskManager hostname.
- 	 */
-	public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
+	 * Get the taskManager hostname.
+	 */
+	public Option<String> getTaskManagerHostname() {
+		return taskManagerHostname;
+	}
 
 	/**
- 	 * Get the bootstrap command.
- 	 */
-	public Option<String> bootstrapCommand() { return bootstrapCommand;	}	
+	 * Get the bootstrap command.
+	 */
+	public Option<String> bootstrapCommand() {
+		return bootstrapCommand;
+	}
 
 	@Override
 	public String toString() {
@@ -200,8 +205,9 @@ public class MesosTaskManagerParameters {
 
 	/**
 	 * Create the Mesos TaskManager parameters.
+	 *
 	 * @param flinkConfig the TM configuration.
-     */
+	 */
 	public static MesosTaskManagerParameters create(Configuration flinkConfig) {
 
 		List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
@@ -212,7 +218,7 @@ public class MesosTaskManagerParameters {
 			flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
 
 		double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
-		if(cpus <= 0.0) {
+		if (cpus <= 0.0) {
 			cpus = Math.max(containeredParameters.numSlots(), 1.0);
 		}
 
@@ -221,13 +227,13 @@ public class MesosTaskManagerParameters {
 
 		ContainerType containerType;
 		String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
-		switch(containerTypeString) {
+		switch (containerTypeString) {
 			case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
 				containerType = ContainerType.MESOS;
 				break;
 			case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
 				containerType = ContainerType.DOCKER;
-				if(imageName == null || imageName.length() == 0) {
+				if (imageName == null || imageName.length() == 0) {
 					throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
 						" must be specified for docker container type");
 				}
@@ -250,7 +256,7 @@ public class MesosTaskManagerParameters {
 			cpus,
 			containerType,
 			Option.apply(imageName),
-			containeredParameters,			
+			containeredParameters,
 			containerVolumes,
 			constraints,
 			tmBootstrapCommand,
@@ -287,7 +293,7 @@ public class MesosTaskManagerParameters {
 			}
 		}));
 	}
-	
+
 	/**
 	 * Used to build volume specs for mesos. This allows for mounting additional volumes into a container
 	 *
@@ -341,6 +347,9 @@ public class MesosTaskManagerParameters {
 		}
 	}
 
+	/**
+	 * The supported containerizers.
+	 */
 	public enum ContainerType {
 		MESOS,
 		DOCKER

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 625880b..e1b0efa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -36,14 +28,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
 /**
  * The entry point for running a TaskManager in a Mesos container.
  */
@@ -59,7 +58,7 @@ public class MesosTaskManagerRunner {
 				.addOption(BootstrapTools.newDynamicPropertiesOption());
 	}
 
-	/** The process environment variables */
+	/** The process environment variables. */
 	private static final Map<String, String> ENV = System.getenv();
 
 	public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 0ac5f4e..a28020a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
 import org.apache.flink.util.ConfigurationUtil;
 
+/**
+ * Utilities for the {@link MesosServices}.
+ */
 public class MesosServicesUtils {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index dfbc2c3..aa3157f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -35,5 +35,6 @@ public class StandaloneMesosServices implements MesosServices {
 	}
 
 	@Override
-	public void close(boolean cleanup) throws Exception {}
+	public void close(boolean cleanup) throws Exception {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index d6ff6bc..f1f54ce 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -19,13 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.List;
 import java.util.Objects;
 
+import scala.Option;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -83,7 +84,7 @@ public interface MesosWorkerStore {
 	/**
 	 * A stored worker.
 	 *
-	 * The assigned slaveid/hostname is valid in Launched and Released states.  The hostname is needed
+	 * <p>The assigned slaveid/hostname is valid in Launched and Released states.  The hostname is needed
 	 * by Fenzo for optimization purposes.
 	 */
 	class Worker implements Serializable {
@@ -112,28 +113,28 @@ public interface MesosWorkerStore {
 
 		/**
 		 * Get the worker's task ID.
-         */
+		 */
 		public Protos.TaskID taskID() {
 			return taskID;
 		}
 
 		/**
 		 * Get the worker's assigned slave ID.
-         */
+		 */
 		public Option<Protos.SlaveID> slaveID() {
 			return slaveID;
 		}
 
 		/**
 		 * Get the worker's assigned hostname.
-         */
+		 */
 		public Option<String> hostname() {
 			return hostname;
 		}
 
 		/**
 		 * Get the worker's state.
-         */
+		 */
 		public WorkerState state() {
 			return state;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
index c6ccbee..b43b89c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
@@ -19,13 +19,14 @@
 package org.apache.flink.mesos.runtime.clusterframework.store;
 
 import org.apache.mesos.Protos;
-import scala.Option;
 
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Option;
+
 /**
  * A standalone Mesos worker store.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 4544b8e..92e4416 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -26,11 +26,11 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
 import org.apache.flink.util.FlinkException;
+
 import org.apache.mesos.Protos;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,6 +38,8 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.List;
 
+import scala.Option;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -53,13 +55,13 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 	/** Flag indicating whether this instance is running. */
 	private boolean isRunning;
 
-	/** A persistent value of the assigned framework ID */
+	/** A persistent value of the assigned framework ID. */
 	private final ZooKeeperSharedValue frameworkIdInZooKeeper;
 
-	/** A persistent count of all tasks created, for generating unique IDs */
+	/** A persistent count of all tasks created, for generating unique IDs. */
 	private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
 
-	/** A persistent store of serialized workers */
+	/** A persistent store of serialized workers. */
 	private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;
 
 	@SuppressWarnings("unchecked")
@@ -69,7 +71,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		ZooKeeperSharedCount totalTaskCountInZooKeeper) throws Exception {
 		this.workersInZooKeeper = checkNotNull(workersInZooKeeper, "workersInZooKeeper");
 		this.frameworkIdInZooKeeper = checkNotNull(frameworkIdInZooKeeper, "frameworkIdInZooKeeper");
-		this.totalTaskCountInZooKeeper= checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
+		this.totalTaskCountInZooKeeper = checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
 	}
 
 	@Override
@@ -89,7 +91,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 				frameworkIdInZooKeeper.close();
 				totalTaskCountInZooKeeper.close();
 
-				if(cleanup) {
+				if (cleanup) {
 					workersInZooKeeper.releaseAndTryRemoveAll();
 				}
 
@@ -237,7 +239,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
 		synchronized (startStopLock) {
 			verifyIsRunning();
 
-			if(workersInZooKeeper.exists(path) == -1) {
+			if (workersInZooKeeper.exists(path) == -1) {
 				LOG.debug("No such worker {} in ZooKeeper.", taskID);
 				return false;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
index b6d3383..0ac05c1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.mesos.scheduler;
 
-import akka.actor.ActorRef;
-
 import org.apache.flink.mesos.scheduler.messages.Disconnected;
 import org.apache.flink.mesos.scheduler.messages.Error;
 import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
@@ -30,6 +28,8 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
 import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
 import org.apache.flink.mesos.scheduler.messages.SlaveLost;
 import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+
+import akka.actor.ActorRef;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
@@ -39,14 +39,14 @@ import java.util.List;
 /**
  * This class reacts to callbacks from the Mesos scheduler driver.
  *
- * In order to preserve actor concurrency safety, this class simply sends
+ * <p>In order to preserve actor concurrency safety, this class simply sends
  * corresponding messages to the Mesos resource master actor.
  *
- * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ * <p>See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
  */
 public class SchedulerProxy implements Scheduler {
 
-	/** The actor to which we report the callbacks */
+	/** The actor to which we report the callbacks. */
 	private final ActorRef mesosActor;
 
 	public SchedulerProxy(ActorRef mesosActor) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
index c841e22..a5653e1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
@@ -25,7 +25,7 @@ import com.netflix.fenzo.functions.Action1;
 /**
  * A builder for the Fenzo task scheduler.
  *
- * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
+ * <p>Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
  */
 public interface TaskSchedulerBuilder {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
index dc5111d..8ca7e42 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
@@ -22,6 +22,7 @@ import org.apache.mesos.Protos;
 
 import java.io.Serializable;
 import java.util.List;
+
 import static java.util.Objects.requireNonNull;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
index a6a26dc..c62ab84 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -19,10 +19,11 @@
 package org.apache.flink.mesos.util;
 
 import org.apache.flink.core.fs.Path;
-import scala.Option;
 
 import java.net.URL;
 
+import scala.Option;
+
 /**
  * An interface for resolving artifact URIs.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index ae826db..967d818 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.mesos.util;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.net.SSLUtils;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
@@ -42,25 +50,17 @@ import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedStream;
-
 import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.handler.ssl.SslHandler;
 import io.netty.util.CharsetUtil;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.SSLUtils;
 import org.jets3t.service.utils.Mimetypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -69,6 +69,8 @@ import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 
+import scala.Option;
+
 import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -81,11 +83,10 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
-
 /**
  * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher.
  *
- * More information:
+ * <p>More information:
  * http://mesos.apache.org/documentation/latest/fetcher/
  * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
  */
@@ -101,7 +102,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 	private final URL baseURL;
 
-	private final Map<Path,URL> paths = new HashMap<>();
+	private final Map<Path, URL> paths = new HashMap<>();
 
 	private final SSLContext serverSSLContext;
 
@@ -153,7 +154,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 			}
 		};
 
-		NioEventLoopGroup bossGroup   = new NioEventLoopGroup(1);
+		NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
 		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 
 		this.bootstrap = new ServerBootstrap();
@@ -169,7 +170,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		String address = bindAddress.getAddress().getHostAddress();
 		int port = bindAddress.getPort();
 
-		String httpProtocol = (serverSSLContext != null) ? "https": "http";
+		String httpProtocol = (serverSSLContext != null) ? "https" : "http";
 
 		baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
 
@@ -201,7 +202,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	 * @param remoteFile the remote path with which to locate the file.
 	 * @return the fully-qualified remote path to the file.
 	 * @throws MalformedURLException if the remote path is invalid.
-     */
+	 */
 	public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
 		return addPath(new Path(localFile.toURI()), new Path(remoteFile));
 	}
@@ -214,10 +215,10 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	 * @throws MalformedURLException if the remote path is invalid.
 	 */
 	public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
-		if(paths.containsKey(remoteFile)) {
+		if (paths.containsKey(remoteFile)) {
 			throw new IllegalArgumentException("duplicate path registered");
 		}
-		if(remoteFile.isAbsolute()) {
+		if (remoteFile.isAbsolute()) {
 			throw new IllegalArgumentException("not expecting an absolute path");
 		}
 		URL fileURL = new URL(baseURL, remoteFile.toString());
@@ -229,7 +230,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 	}
 
 	public synchronized void removePath(Path remoteFile) {
-		if(paths.containsKey(remoteFile)) {
+		if (paths.containsKey(remoteFile)) {
 			URL fileURL = null;
 			try {
 				fileURL = new URL(baseURL, remoteFile.toString());
@@ -274,11 +275,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 
 		public VirtualFileServerHandler(Path path) throws IOException {
 			this.path = path;
-			if(!path.isAbsolute()) {
+			if (!path.isAbsolute()) {
 				throw new IllegalArgumentException("path must be absolute: " + path.toString());
 			}
 			this.fs = path.getFileSystem();
-			if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+			if (!fs.exists(path) || fs.getFileStatus(path).isDir()) {
 				throw new IllegalArgumentException("no such file: " + path.toString());
 			}
 		}
@@ -292,12 +293,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				LOG.debug("{} request for file '{}'", request.getMethod(), path);
 			}
 
-			if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
+			if (!(request.getMethod() == GET || request.getMethod() == HEAD)) {
 				sendMethodNotAllowed(ctx);
 				return;
 			}
 
-
 			final FileStatus status;
 			try {
 				status = fs.getFileStatus(path);
@@ -322,8 +322,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 				final FSDataInputStream stream = fs.open(path);
 				try {
 					ctx.write(new ChunkedStream(stream));
-				}
-				catch(Exception e) {
+				} catch (Exception e) {
 					stream.close();
 					throw e;
 				}
@@ -369,7 +368,6 @@ public class MesosArtifactServer implements MesosArtifactResolver {
 		}
 	}
 
-
 	/**
 	 * Handle a request for a non-existent file.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
index 40dc41c..7660e9c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
@@ -23,10 +23,11 @@ import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
 import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
-import scala.Option;
 
 import java.util.Map;
 
+import scala.Option;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -53,7 +54,7 @@ public class MesosConfiguration {
 	/**
 	 * The Mesos connection string.
 	 *
-	 * The value should be in one of the following forms:
+	 * <p>The value should be in one of the following forms:
 	 * <pre>
 	 * {@code
 	 *     host:port
@@ -62,28 +63,28 @@ public class MesosConfiguration {
 	 *     file:///path/to/file (where file contains one of the above)
 	 * }
 	 * </pre>
-     */
+	 */
 	public String masterUrl() {
 		return masterUrl;
 	}
 
 	/**
 	 * The framework registration info.
-     */
+	 */
 	public Protos.FrameworkInfo.Builder frameworkInfo() {
 		return frameworkInfo;
 	}
 
 	/**
 	 * The credential to authenticate the framework principal.
-     */
+	 */
 	public Option<Protos.Credential.Builder> credential() {
 		return credential;
 	}
 
 	/**
 	 * Revise the configuration with updated framework info.
-     */
+	 */
 	public MesosConfiguration withFrameworkInfo(Protos.FrameworkInfo.Builder frameworkInfo) {
 		return new MesosConfiguration(masterUrl, frameworkInfo, credential);
 	}
@@ -92,11 +93,11 @@ public class MesosConfiguration {
 	 * Create the Mesos scheduler driver based on this configuration.
 	 * @param scheduler the scheduler to use.
 	 * @param implicitAcknowledgements whether to configure the driver for implicit acknowledgements.
-     * @return a scheduler driver.
-     */
+	 * @return a scheduler driver.
+	 */
 	public SchedulerDriver createDriver(Scheduler scheduler, boolean implicitAcknowledgements) {
 		MesosSchedulerDriver schedulerDriver;
-		if(this.credential().isDefined()) {
+		if (this.credential().isDefined()) {
 			schedulerDriver =
 				new MesosSchedulerDriver(scheduler, frameworkInfo.build(), this.masterUrl(),
 					implicitAcknowledgements, this.credential().get().build());
@@ -119,11 +120,11 @@ public class MesosConfiguration {
 	}
 
 	/**
-	 * A utility method to log relevant Mesos connection info
-     */
+	 * A utility method to log relevant Mesos connection info.
+	 */
 	public static void logMesosConfig(Logger log, MesosConfiguration config) {
 
-		Map<String,String> env = System.getenv();
+		Map<String, String> env = System.getenv();
 		Protos.FrameworkInfo.Builder info = config.frameworkInfo();
 
 		log.info("--------------------------------------------------------------------------------");
@@ -137,10 +138,10 @@ public class MesosConfiguration {
 		log.info("    Role: {}", info.hasRole() ? info.getRole() : "(none)");
 		log.info("    Principal: {}", info.hasPrincipal() ? info.getPrincipal() : "(none)");
 		log.info("    Host: {}", info.hasHostname() ? info.getHostname() : "(none)");
-		if(env.containsKey("LIBPROCESS_IP")) {
+		if (env.containsKey("LIBPROCESS_IP")) {
 			log.info("    LIBPROCESS_IP: {}", env.get("LIBPROCESS_IP"));
 		}
-		if(env.containsKey("LIBPROCESS_PORT")) {
+		if (env.containsKey("LIBPROCESS_PORT")) {
 			log.info("    LIBPROCESS_PORT: {}", env.get("LIBPROCESS_PORT"));
 		}
 		log.info("    Web UI: {}", info.hasWebuiUrl() ? info.getWebuiUrl() : "(none)");

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
index 6892a65..4211642 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.mesos.util;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Utilities for Zookeeper.
+ */
 public class ZooKeeperUtils {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7ab4e40..af3f7ef 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,29 +18,33 @@
 
 package org.apache.flink.mesos.runtime.clusterframework;
 
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
 import org.apache.flink.mesos.scheduler.ConnectionMonitor;
 import org.apache.flink.mesos.scheduler.LaunchCoordinator;
 import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.*;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
 import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
 import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.messages.*;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -50,27 +54,43 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
-import org.apache.mesos.SchedulerDriver;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
 import org.apache.mesos.Protos;
 import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 
-import static java.util.Collections.singletonList;
+import scala.Option;
 
+import static java.util.Collections.singletonList;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
 import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * General tests for the Mesos resource manager component.
@@ -125,13 +145,24 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 		}
 
 		@Override
-		protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+		protected ActorRef createConnectionMonitor() {
+			return connectionMonitor.ref();
+		}
+
 		@Override
-		protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+		protected ActorRef createTaskRouter() {
+			return taskRouter.ref();
+		}
+
 		@Override
-		protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+		protected ActorRef createLaunchCoordinator() {
+			return launchCoordinator.ref();
+		}
+
 		@Override
-		protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+		protected ActorRef createReconciliationCoordinator() {
+			return reconciliationCoordinator.ref();
+		}
 
 		@Override
 		protected void fatalError(String message, Throwable error) {
@@ -214,11 +245,11 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 			ContaineredTaskManagerParameters containeredParams =
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
-				1.0, 
-				MesosTaskManagerParameters.ContainerType.MESOS, 
-				Option.<String>empty(), 
-				containeredParams, 
-				Collections.<Protos.Volume>emptyList(), 
+				1.0,
+				MesosTaskManagerParameters.ContainerType.MESOS,
+				Option.<String>empty(),
+				containeredParams,
+				Collections.<Protos.Volume>emptyList(),
 				Collections.<ConstraintEvaluator>emptyList(),
 				Option.<String>empty(),
 				Option.<String>empty());
@@ -301,8 +332,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
 
 						register(Collections.<ResourceID>emptyList());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -331,8 +361,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
 						register(singletonList(extractResourceID(task1)));
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -364,8 +393,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 
 						// verify that the internal state was updated
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -409,8 +437,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						// verify that the instance state was updated
 						assertThat(resourceManagerInstance.workersBeingReturned.entrySet(), empty());
 						verify(workerStore).removeWorker(task1);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -442,8 +469,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
 						resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -508,8 +534,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.taskRouter.expectMsg(
 							new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
 						verify(schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -567,8 +592,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						// verify that the instance state was updated
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
 						verify(workerStore).newTaskID();
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -604,8 +628,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
 						expectMsgClass(ResourceRemoved.class);
 						verify(workerStore).newTaskID();
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -632,8 +655,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						verify(schedulerDriver).stop(false);
 						verify(workerStore).stop(true);
 						expectTerminated(resourceManager.actor());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -665,8 +687,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Registered.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(Registered.class);
 						resourceManagerInstance.taskRouter.expectMsgClass(Registered.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -697,8 +718,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(ReRegistered.class);
 						resourceManagerInstance.taskRouter.expectMsgClass(ReRegistered.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -726,8 +746,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Disconnected.class);
 						resourceManagerInstance.launchCoordinator.expectMsgClass(Disconnected.class);
 						resourceManagerInstance.taskRouter.expectMsgClass(Disconnected.class);
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}
@@ -752,8 +771,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 						watch(resourceManager.actor());
 						resourceManager.tell(new Error("test"), resourceManager);
 						expectTerminated(resourceManager.actor());
-					}
-					catch(Exception ex) {
+					} catch (Exception ex) {
 						throw new RuntimeException(ex);
 					}
 				}