You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/25 18:49:43 UTC
[2/5] flink git commit: [FLINK-6715] Activate strict checkstyle for
flink-mesos
[FLINK-6715] Activate strict checkstyle for flink-mesos
This closes #3988.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bca76ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bca76ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bca76ed
Branch: refs/heads/master
Commit: 0bca76ede8b7447014a7d7ed17633d77ecfafe18
Parents: 0e69dd5
Author: zentol <ch...@apache.org>
Authored: Thu May 25 10:25:24 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu May 25 15:56:26 2017 +0200
----------------------------------------------------------------------
flink-mesos/pom.xml | 35 +++++
.../main/java/org/apache/flink/mesos/Utils.java | 9 +-
.../flink/mesos/cli/FlinkMesosSessionCli.java | 15 +-
.../clusterframework/LaunchableMesosWorker.java | 35 ++---
.../MesosApplicationMasterRunner.java | 65 +++++----
.../clusterframework/MesosConfigKeys.java | 17 +--
.../MesosFlinkResourceManager.java | 101 +++++++-------
.../MesosTaskManagerParameters.java | 71 +++++-----
.../MesosTaskManagerRunner.java | 19 ++-
.../services/MesosServicesUtils.java | 3 +
.../services/StandaloneMesosServices.java | 3 +-
.../store/MesosWorkerStore.java | 13 +-
.../store/StandaloneMesosWorkerStore.java | 3 +-
.../store/ZooKeeperMesosWorkerStore.java | 16 ++-
.../flink/mesos/scheduler/SchedulerProxy.java | 10 +-
.../mesos/scheduler/TaskSchedulerBuilder.java | 2 +-
.../scheduler/messages/ResourceOffers.java | 1 +
.../flink/mesos/util/MesosArtifactResolver.java | 3 +-
.../flink/mesos/util/MesosArtifactServer.java | 50 ++++---
.../flink/mesos/util/MesosConfiguration.java | 29 ++--
.../apache/flink/mesos/util/ZooKeeperUtils.java | 6 +-
.../MesosFlinkResourceManagerTest.java | 122 +++++++++-------
.../MesosTaskManagerParametersTest.java | 139 ++++++++++---------
23 files changed, 427 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/pom.xml
----------------------------------------------------------------------
diff --git a/flink-mesos/pom.xml b/flink-mesos/pom.xml
index 4dae731..94187ee 100644
--- a/flink-mesos/pom.xml
+++ b/flink-mesos/pom.xml
@@ -300,6 +300,41 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 7787e40..308e093 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -20,12 +20,17 @@ package org.apache.flink.mesos;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+
import org.apache.mesos.Protos;
-import scala.Option;
import java.net.URL;
import java.util.Arrays;
+import scala.Option;
+
+/**
+ * Collection of utility methods.
+ */
public class Utils {
/**
* Construct a Mesos environment variable.
@@ -53,7 +58,7 @@ public class Utils {
*/
public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
Option<URL> url = resolver.resolve(artifact.dest);
- if(url.isEmpty()) {
+ if (url.isEmpty()) {
throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
index dcce0b8..f6850d2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java
@@ -18,10 +18,11 @@
package org.apache.flink.mesos.cli;
+import org.apache.flink.configuration.Configuration;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.configuration.Configuration;
import java.io.IOException;
import java.util.Map;
@@ -35,28 +36,30 @@ public class FlinkMesosSessionCli {
/**
* Decode encoded dynamic properties.
+ *
* @param dynamicPropertiesEncoded encoded properties produced by the encoding method.
* @return a configuration instance to be merged with the static configuration.
*/
public static Configuration decodeDynamicProperties(String dynamicPropertiesEncoded) {
try {
Configuration configuration = new Configuration();
- if(dynamicPropertiesEncoded != null) {
- TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {};
- Map<String,String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
+ if (dynamicPropertiesEncoded != null) {
+ TypeReference<Map<String, String>> typeRef = new TypeReference<Map<String, String>>() {
+ };
+ Map<String, String> props = mapper.readValue(dynamicPropertiesEncoded, typeRef);
for (Map.Entry<String, String> property : props.entrySet()) {
configuration.setString(property.getKey(), property.getValue());
}
}
return configuration;
- }
- catch(IOException ex) {
+ } catch (IOException ex) {
throw new IllegalArgumentException("unreadable encoded properties", ex);
}
}
/**
* Encode dynamic properties as a string to be transported as an environment variable.
+ *
* @param configuration the dynamic properties to encode.
* @return a string to be decoded later.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 04a406f..ce7bb9d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -18,23 +18,23 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.TaskAssignmentResult;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.VMTaskFitnessCalculator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.util.Collections;
import java.util.List;
@@ -42,15 +42,17 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
-import static org.apache.flink.mesos.Utils.variable;
+import scala.Option;
+
import static org.apache.flink.mesos.Utils.range;
import static org.apache.flink.mesos.Utils.ranges;
import static org.apache.flink.mesos.Utils.scalar;
+import static org.apache.flink.mesos.Utils.variable;
/**
* Implements the launch of a Mesos worker.
*
- * Translates the abstract {@link ContainerSpecification} into a concrete
+ * <p>Translates the abstract {@link ContainerSpecification} into a concrete
* Mesos-specific {@link Protos.TaskInfo}.
*/
public class LaunchableMesosWorker implements LaunchableTask {
@@ -59,9 +61,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
/**
* The set of configuration keys to be dynamically configured with a port allocated from Mesos.
*/
- private static String[] TM_PORT_KEYS = {
+ private static final String[] TM_PORT_KEYS = {
"taskmanager.rpc.port",
- "taskmanager.data.port" };
+ "taskmanager.data.port"};
private final MesosArtifactResolver resolver;
private final ContainerSpecification containerSpec;
@@ -88,7 +90,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
this.params = Preconditions.checkNotNull(params);
this.taskID = Preconditions.checkNotNull(taskID);
this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
-
+
this.taskRequest = new Request();
}
@@ -204,7 +206,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
//configure task manager hostname property if hostname override property is supplied
Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
- if(taskManagerHostnameOption.isDefined()) {
+ if (taskManagerHostnameOption.isDefined()) {
// replace the TASK_ID pattern by the actual task id value of the Mesos task
final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
.matcher(taskManagerHostnameOption.get())
@@ -225,7 +227,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
}
// ship additional files
- for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+ for (ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
cmd.addUris(Utils.uri(resolver, artifact));
}
@@ -271,9 +273,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
// in event that no docker image or mesos image name is specified, we must still
// set type to MESOS
containerInfo.setType(Protos.ContainerInfo.Type.MESOS);
- switch(params.containerType()) {
+ switch (params.containerType()) {
case MESOS:
- if(params.containerImageName().isDefined()) {
+ if (params.containerImageName().isDefined()) {
containerInfo
.setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
.setImage(Protos.Image.newBuilder()
@@ -285,7 +287,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
case DOCKER:
assert(params.containerImageName().isDefined());
- containerInfo
+ containerInfo
.setType(Protos.ContainerInfo.Type.DOCKER)
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
@@ -300,7 +302,6 @@ public class LaunchableMesosWorker implements LaunchableTask {
containerInfo.addAllVolumes(params.containerVolumes());
taskInfo.setContainer(containerInfo);
-
return taskInfo.build();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 09ef380..fc75bd7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -18,16 +18,6 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Props;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -62,15 +52,19 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
-import org.apache.mesos.Protos;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.Props;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
@@ -82,6 +76,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -89,24 +87,26 @@ import static org.apache.flink.util.Preconditions.checkState;
* It starts actor system and the actors for {@link JobManager}
* and {@link MesosFlinkResourceManager}.
*
- * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
+ * <p>The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
* allocation and failure detection.
*/
public class MesosApplicationMasterRunner {
- /** Logger */
+
protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
- /** The maximum time that TaskManagers may be waiting to register at the JobManager,
- * before they quit */
+ /**
+ * The maximum time that TaskManagers may be waiting to register at the JobManager,
+ * before they quit.
+ */
private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
- /** The process environment variables */
+ /** The process environment variables. */
private static final Map<String, String> ENV = System.getenv();
- /** The exit code returned if the initialization of the application master failed */
+ /** The exit code returned if the initialization of the application master failed. */
private static final int INIT_ERROR_EXIT_CODE = 31;
- /** The exit code returned if the process exits because a critical actor died */
+ /** The exit code returned if the process exits because a critical actor died. */
private static final int ACTOR_DIED_EXIT_CODE = 32;
// ------------------------------------------------------------------------
@@ -142,7 +142,7 @@ public class MesosApplicationMasterRunner {
/**
* The instance entry point for the Mesos AppMaster. Obtains user group
- * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
+ * information and calls the main work method {@link #runPrivileged(Configuration, Configuration)} as a
* privileged action.
*
* @param args The command line arguments.
@@ -318,7 +318,6 @@ public class MesosApplicationMasterRunner {
getJobManagerClass(),
getArchivistClass())._1();
-
// 2: the web monitor
LOG.debug("Starting Web Frontend");
@@ -328,7 +327,7 @@ public class MesosApplicationMasterRunner {
actorSystem,
jobManager,
LOG);
- if(webMonitor != null) {
+ if (webMonitor != null) {
final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/");
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
}
@@ -380,7 +379,7 @@ public class MesosApplicationMasterRunner {
}
}
- if(artifactServer != null) {
+ if (artifactServer != null) {
try {
artifactServer.stop();
} catch (Throwable ignored) {
@@ -396,7 +395,7 @@ public class MesosApplicationMasterRunner {
}
}
- if(futureExecutor != null) {
+ if (futureExecutor != null) {
try {
futureExecutor.shutdownNow();
} catch (Throwable tt) {
@@ -404,7 +403,7 @@ public class MesosApplicationMasterRunner {
}
}
- if(ioExecutor != null) {
+ if (ioExecutor != null) {
try {
ioExecutor.shutdownNow();
} catch (Throwable tt) {
@@ -493,7 +492,7 @@ public class MesosApplicationMasterRunner {
.setHostname(hostname);
Protos.Credential.Builder credential = null;
- if(!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
+ if (!flinkConfig.containsKey(ConfigConstants.MESOS_MASTER_URL)) {
throw new IllegalConfigurationException(ConfigConstants.MESOS_MASTER_URL + " must be configured.");
}
String masterUrl = flinkConfig.getString(ConfigConstants.MESOS_MASTER_URL, null);
@@ -517,7 +516,7 @@ public class MesosApplicationMasterRunner {
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
- if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+ if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
frameworkInfo.setPrincipal(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
@@ -526,7 +525,7 @@ public class MesosApplicationMasterRunner {
// some environments use a side-channel to communicate the secret to Mesos,
// and thus don't set the 'secret' configuration setting
- if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+ if (flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
credential.setSecret(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
}
@@ -545,7 +544,7 @@ public class MesosApplicationMasterRunner {
* needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
- */
+ */
private static void applyOverlays(
Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
@@ -565,7 +564,7 @@ public class MesosApplicationMasterRunner {
private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
// serve the artifacts associated with the container environment
- for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+ for (ContainerSpecification.Artifact artifact : container.getArtifacts()) {
server.addPath(artifact.source, artifact.dest);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index 35da95f..1af94e2 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -27,30 +27,31 @@ public class MesosConfigKeys {
// ------------------------------------------------------------------------
/**
- * The Mesos task ID, used by the TM for informational purposes
+ * The Mesos task ID, used by the TM for informational purposes.
*/
public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
/**
- * Reserved for future enhancement
+ * Reserved for future enhancement.
*/
public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
/**
- * JVM arguments, used by the JM and TM
+ * JVM arguments, used by the JM and TM.
*/
public static final String ENV_JVM_ARGS = "JVM_ARGS";
/**
- * Standard environment variables used in DCOS environment
+ * Standard environment variables used in DCOS environment.
*/
public static final String ENV_TASK_NAME = "TASK_NAME";
/**
- * Standard environment variables used in DCOS environment
- */
+ * Standard environment variables used in DCOS environment.
+ */
public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
- /** Private constructor to prevent instantiation */
- private MesosConfigKeys() {}
+ /** Private constructor to prevent instantiation. */
+ private MesosConfigKeys() {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 17ffef7..6c708fa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -18,20 +18,14 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import com.netflix.fenzo.TaskRequest;
-import com.netflix.fenzo.TaskScheduler;
-import com.netflix.fenzo.VirtualMachineLease;
-import com.netflix.fenzo.functions.Action1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
-import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
import org.apache.flink.mesos.scheduler.SchedulerProxy;
import org.apache.flink.mesos.scheduler.TaskMonitor;
@@ -48,17 +42,23 @@ import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.FrameworkInfo;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
-import scala.Option;
import java.util.ArrayList;
import java.util.Collection;
@@ -66,6 +66,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import scala.Option;
+
import static java.util.Objects.requireNonNull;
/**
@@ -73,25 +75,25 @@ import static java.util.Objects.requireNonNull;
*/
public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> {
- /** The Mesos configuration (master and framework info) */
+ /** The Mesos configuration (master and framework info). */
private final MesosConfiguration mesosConfig;
- /** The TaskManager container parameters (like container memory size) */
+ /** The TaskManager container parameters (like container memory size). */
private final MesosTaskManagerParameters taskManagerParameters;
- /** Container specification for launching a TM */
+ /** Container specification for launching a TM. */
private final ContainerSpecification taskManagerContainerSpec;
- /** Resolver for HTTP artifacts **/
+ /** Resolver for HTTP artifacts. **/
private final MesosArtifactResolver artifactResolver;
/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
private final int maxFailedTasks;
- /** Callback handler for the asynchronous Mesos scheduler */
+ /** Callback handler for the asynchronous Mesos scheduler. */
private SchedulerProxy schedulerCallbackHandler;
- /** Mesos scheduler driver */
+ /** Mesos scheduler driver. */
private SchedulerDriver schedulerDriver;
private ActorRef connectionMonitor;
@@ -104,12 +106,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private final MesosWorkerStore workerStore;
- /** planning state related to workers - package private for unit test purposes */
+ /** planning state related to workers - package private for unit test purposes. */
final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
- /** The number of failed tasks since the master became active */
+ /** The number of failed tasks since the master became active. */
private int failedTasksSoFar;
public MesosFlinkResourceManager(
@@ -158,7 +160,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
.setCheckpoint(true);
Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID();
- if(frameworkID.isEmpty()) {
+ if (frameworkID.isEmpty()) {
LOG.info("Registering as new framework.");
}
else {
@@ -248,7 +250,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message;
taskTerminated(msg.taskID(), msg.status());
- } else {
+ } else {
// message handled by the generic resource master code
super.handleMessage(message);
}
@@ -267,15 +269,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
try {
// unregister the framework, which implicitly removes all tasks.
schedulerDriver.stop(false);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
LOG.warn("unable to unregister the framework", ex);
}
try {
workerStore.stop(true);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
LOG.warn("unable to stop the worker state store", ex);
}
@@ -308,13 +308,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
if (!tasksFromPreviousAttempts.isEmpty()) {
LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size());
- List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
+ List<Tuple2<TaskRequest, String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size());
List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size());
for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) {
LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID());
- switch(worker.state()) {
+ switch (worker.state()) {
case New:
workersInNew.put(extractResourceID(worker.taskID()), worker);
toLaunch.add(launchable);
@@ -331,11 +331,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
}
// tell the launch coordinator about prior assignments
- if(toAssign.size() >= 1) {
+ if (toAssign.size() >= 1) {
launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self());
}
// tell the launch coordinator to launch any new tasks
- if(toLaunch.size() >= 1) {
+ if (toLaunch.size() >= 1) {
launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
}
}
@@ -374,11 +374,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
}
// tell the launch coordinator to launch the new tasks
- if(toLaunch.size() >= 1) {
+ if (toLaunch.size() >= 1) {
launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self());
}
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to request new workers", ex);
}
}
@@ -386,7 +385,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
/**
* Accept offers as advised by the launch coordinator.
*
- * Acceptance is routed through the RM to update the persistent state before
+ * <p>Acceptance is routed through the RM to update the persistent state before
* forwarding the message to Mesos.
*/
private void acceptOffers(AcceptOffers msg) {
@@ -421,15 +420,14 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
// send the acceptance message to Mesos
schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to accept offers", ex);
}
}
/**
* Handle a task status change.
- */
+ */
private void taskStatusUpdated(StatusUpdate message) {
taskRouter.tell(message, self());
reconciliationCoordinator.tell(message, self());
@@ -470,9 +468,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
if (worker != null) {
LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID);
accepted.add(new RegisteredMesosWorkerNode(worker));
- }
- else {
- if(isStarted(resourceID)) {
+ } else {
+ if (isStarted(resourceID)) {
LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID);
}
else {
@@ -549,8 +546,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
try {
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to store the assigned framework ID", ex);
return;
}
@@ -598,13 +594,12 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
boolean existed;
try {
existed = workerStore.removeWorker(taskID);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
fatalError("unable to remove worker", ex);
return;
}
- if(!existed) {
+ if (!existed) {
LOG.info("Received a termination notice for an unrecognized worker: {}", id);
return;
}
@@ -695,11 +690,15 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
* @return goal state information for the {@Link TaskMonitor}.
*/
static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) {
- switch(worker.state()) {
- case New: return new TaskMonitor.New(worker.taskID());
- case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
- case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
- default: throw new IllegalArgumentException("unsupported worker state");
+ switch (worker.state()) {
+ case New:
+ return new TaskMonitor.New(worker.taskID());
+ case Launched:
+ return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get());
+ case Released:
+ return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get());
+ default:
+ throw new IllegalArgumentException("unsupported worker state");
}
}
@@ -727,7 +726,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
/**
* Creates the props needed to instantiate this actor.
*
- * Rather than extracting and validating parameters in the constructor, this factory method takes
+ * <p>Rather than extracting and validating parameters in the constructor, this factory method takes
* care of that. That way, errors occur synchronously, and are not swallowed simply in a
* failed asynchronous attempt to start the actor.
@@ -746,7 +745,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
*
* @return The Props object to instantiate the MesosFlinkResourceManager actor.
*/
- public static Props createActorProps(Class<? extends MesosFlinkResourceManager> actorClass,
+ public static Props createActorProps(
+ Class<? extends MesosFlinkResourceManager> actorClass,
Configuration flinkConfig,
MesosConfiguration mesosConfig,
MesosWorkerStore workerStore,
@@ -754,8 +754,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec,
MesosArtifactResolver artifactResolver,
- Logger log)
- {
+ Logger log) {
final int numInitialTaskManagers = flinkConfig.getInteger(
ConfigConstants.MESOS_INITIAL_TASKS, 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 4324469..f5a415e 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -18,34 +18,36 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import com.netflix.fenzo.ConstraintEvaluator;
-import com.netflix.fenzo.functions.Func1;
-import com.netflix.fenzo.plugins.HostAttrValueConstraint;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.Preconditions;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.functions.Func1;
+import com.netflix.fenzo.plugins.HostAttrValueConstraint;
import org.apache.mesos.Protos;
-import scala.Option;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
+import scala.Option;
+
import static org.apache.flink.configuration.ConfigOptions.key;
/**
* This class describes the Mesos-specific parameters for launching a TaskManager process.
*
- * These parameters are in addition to the common parameters
+ * <p>These parameters are in addition to the common parameters
* provided by {@link ContaineredTaskManagerParameters}.
*/
public class MesosTaskManagerParameters {
- /** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
+ /** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task. */
public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
@@ -75,11 +77,11 @@ public class MesosTaskManagerParameters {
public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
key("mesos.resourcemanager.tasks.bootstrap-cmd")
.noDefaultValue();
-
+
public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
key("mesos.resourcemanager.tasks.container.volumes")
.noDefaultValue();
-
+
public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
key("mesos.constraints.hard.hostattribute")
.noDefaultValue();
@@ -102,9 +104,9 @@ public class MesosTaskManagerParameters {
private final ContaineredTaskManagerParameters containeredParameters;
private final List<Protos.Volume> containerVolumes;
-
+
private final List<ConstraintEvaluator> constraints;
-
+
private final Option<String> bootstrapCommand;
private final Option<String> taskManagerHostname;
@@ -129,10 +131,9 @@ public class MesosTaskManagerParameters {
this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
}
-
- /**
+ /**
* Get the CPU units to use for the TaskManager process.
- */
+ */
public double cpus() {
return cpus;
}
@@ -140,49 +141,53 @@ public class MesosTaskManagerParameters {
/**
* Get the container type (Mesos or Docker). The default is Mesos.
*
- * Mesos provides a facility for a framework to specify which containerizer to use.
- */
+ * <p>Mesos provides a facility for a framework to specify which containerizer to use.
+ */
public ContainerType containerType() {
return containerType;
}
/**
* Get the container image name.
- */
+ */
public Option<String> containerImageName() {
return containerImageName;
}
/**
* Get the common containered parameters.
- */
+ */
public ContaineredTaskManagerParameters containeredParameters() {
return containeredParameters;
}
/**
- * Get the container volumes string
+ * Get the container volumes string.
*/
public List<Protos.Volume> containerVolumes() {
return containerVolumes;
}
/**
- * Get the placement constraints
+ * Get the placement constraints.
*/
public List<ConstraintEvaluator> constraints() {
return constraints;
}
/**
- * Get the taskManager hostname.
- */
- public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
+ * Get the taskManager hostname.
+ */
+ public Option<String> getTaskManagerHostname() {
+ return taskManagerHostname;
+ }
/**
- * Get the bootstrap command.
- */
- public Option<String> bootstrapCommand() { return bootstrapCommand; }
+ * Get the bootstrap command.
+ */
+ public Option<String> bootstrapCommand() {
+ return bootstrapCommand;
+ }
@Override
public String toString() {
@@ -200,8 +205,9 @@ public class MesosTaskManagerParameters {
/**
* Create the Mesos TaskManager parameters.
+ *
* @param flinkConfig the TM configuration.
- */
+ */
public static MesosTaskManagerParameters create(Configuration flinkConfig) {
List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
@@ -212,7 +218,7 @@ public class MesosTaskManagerParameters {
flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
- if(cpus <= 0.0) {
+ if (cpus <= 0.0) {
cpus = Math.max(containeredParameters.numSlots(), 1.0);
}
@@ -221,13 +227,13 @@ public class MesosTaskManagerParameters {
ContainerType containerType;
String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
- switch(containerTypeString) {
+ switch (containerTypeString) {
case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
containerType = ContainerType.MESOS;
break;
case MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
containerType = ContainerType.DOCKER;
- if(imageName == null || imageName.length() == 0) {
+ if (imageName == null || imageName.length() == 0) {
throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
" must be specified for docker container type");
}
@@ -250,7 +256,7 @@ public class MesosTaskManagerParameters {
cpus,
containerType,
Option.apply(imageName),
- containeredParameters,
+ containeredParameters,
containerVolumes,
constraints,
tmBootstrapCommand,
@@ -287,7 +293,7 @@ public class MesosTaskManagerParameters {
}
}));
}
-
+
/**
* Used to build volume specs for mesos. This allows for mounting additional volumes into a container
*
@@ -341,6 +347,9 @@ public class MesosTaskManagerParameters {
}
}
+ /**
+ * The supported containerizers.
+ */
public enum ContainerType {
MESOS,
DOCKER
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 625880b..e1b0efa 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,14 +18,6 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -36,14 +28,21 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
-
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
/**
* The entry point for running a TaskManager in a Mesos container.
*/
@@ -59,7 +58,7 @@ public class MesosTaskManagerRunner {
.addOption(BootstrapTools.newDynamicPropertiesOption());
}
- /** The process environment variables */
+ /** The process environment variables. */
private static final Map<String, String> ENV = System.getenv();
public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
index 0ac5f4e..a28020a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/MesosServicesUtils.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.zookeeper.ZooKeeperUtilityFactory;
import org.apache.flink.util.ConfigurationUtil;
+/**
+ * Utilities for the {@link MesosServices}.
+ */
public class MesosServicesUtils {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
index dfbc2c3..aa3157f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/StandaloneMesosServices.java
@@ -35,5 +35,6 @@ public class StandaloneMesosServices implements MesosServices {
}
@Override
- public void close(boolean cleanup) throws Exception {}
+ public void close(boolean cleanup) throws Exception {
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
index d6ff6bc..f1f54ce 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
@@ -19,13 +19,14 @@
package org.apache.flink.mesos.runtime.clusterframework.store;
import org.apache.mesos.Protos;
-import scala.Option;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Objects;
+import scala.Option;
+
import static java.util.Objects.requireNonNull;
/**
@@ -83,7 +84,7 @@ public interface MesosWorkerStore {
/**
* A stored worker.
*
- * The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed
+ * <p>The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed
* by Fenzo for optimization purposes.
*/
class Worker implements Serializable {
@@ -112,28 +113,28 @@ public interface MesosWorkerStore {
/**
* Get the worker's task ID.
- */
+ */
public Protos.TaskID taskID() {
return taskID;
}
/**
* Get the worker's assigned slave ID.
- */
+ */
public Option<Protos.SlaveID> slaveID() {
return slaveID;
}
/**
* Get the worker's assigned hostname.
- */
+ */
public Option<String> hostname() {
return hostname;
}
/**
* Get the worker's state.
- */
+ */
public WorkerState state() {
return state;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
index c6ccbee..b43b89c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
@@ -19,13 +19,14 @@
package org.apache.flink.mesos.runtime.clusterframework.store;
import org.apache.mesos.Protos;
-import scala.Option;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import scala.Option;
+
/**
* A standalone Mesos worker store.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 4544b8e..92e4416 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -26,11 +26,11 @@ import org.apache.flink.runtime.zookeeper.ZooKeeperSharedValue;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperVersionedValue;
import org.apache.flink.util.FlinkException;
+
import org.apache.mesos.Protos;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,6 +38,8 @@ import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
+import scala.Option;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -53,13 +55,13 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
/** Flag indicating whether this instance is running. */
private boolean isRunning;
- /** A persistent value of the assigned framework ID */
+ /** A persistent value of the assigned framework ID. */
private final ZooKeeperSharedValue frameworkIdInZooKeeper;
- /** A persistent count of all tasks created, for generating unique IDs */
+ /** A persistent count of all tasks created, for generating unique IDs. */
private final ZooKeeperSharedCount totalTaskCountInZooKeeper;
- /** A persistent store of serialized workers */
+ /** A persistent store of serialized workers. */
private final ZooKeeperStateHandleStore<MesosWorkerStore.Worker> workersInZooKeeper;
@SuppressWarnings("unchecked")
@@ -69,7 +71,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
ZooKeeperSharedCount totalTaskCountInZooKeeper) throws Exception {
this.workersInZooKeeper = checkNotNull(workersInZooKeeper, "workersInZooKeeper");
this.frameworkIdInZooKeeper = checkNotNull(frameworkIdInZooKeeper, "frameworkIdInZooKeeper");
- this.totalTaskCountInZooKeeper= checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
+ this.totalTaskCountInZooKeeper = checkNotNull(totalTaskCountInZooKeeper, "totalTaskCountInZooKeeper");
}
@Override
@@ -89,7 +91,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
frameworkIdInZooKeeper.close();
totalTaskCountInZooKeeper.close();
- if(cleanup) {
+ if (cleanup) {
workersInZooKeeper.releaseAndTryRemoveAll();
}
@@ -237,7 +239,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
synchronized (startStopLock) {
verifyIsRunning();
- if(workersInZooKeeper.exists(path) == -1) {
+ if (workersInZooKeeper.exists(path) == -1) {
LOG.debug("No such worker {} in ZooKeeper.", taskID);
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
index b6d3383..0ac05c1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java
@@ -18,8 +18,6 @@
package org.apache.flink.mesos.scheduler;
-import akka.actor.ActorRef;
-
import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
import org.apache.flink.mesos.scheduler.messages.ExecutorLost;
@@ -30,6 +28,8 @@ import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.SlaveLost;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+
+import akka.actor.ActorRef;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
@@ -39,14 +39,14 @@ import java.util.List;
/**
* This class reacts to callbacks from the Mesos scheduler driver.
*
- * In order to preserve actor concurrency safety, this class simply sends
+ * <p>In order to preserve actor concurrency safety, this class simply sends
* corresponding messages to the Mesos resource master actor.
*
- * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ * <p>See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
*/
public class SchedulerProxy implements Scheduler {
- /** The actor to which we report the callbacks */
+ /** The actor to which we report the callbacks. */
private final ActorRef mesosActor;
public SchedulerProxy(ActorRef mesosActor) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
index c841e22..a5653e1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
@@ -25,7 +25,7 @@ import com.netflix.fenzo.functions.Action1;
/**
* A builder for the Fenzo task scheduler.
*
- * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
+ * <p>Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface.
*/
public interface TaskSchedulerBuilder {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
index dc5111d..8ca7e42 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/ResourceOffers.java
@@ -22,6 +22,7 @@ import org.apache.mesos.Protos;
import java.io.Serializable;
import java.util.List;
+
import static java.util.Objects.requireNonNull;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
index a6a26dc..c62ab84 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -19,10 +19,11 @@
package org.apache.flink.mesos.util;
import org.apache.flink.core.fs.Path;
-import scala.Option;
import java.net.URL;
+import scala.Option;
+
/**
* An interface for resolving artifact URIs.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index ae826db..967d818 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -18,6 +18,14 @@
package org.apache.flink.mesos.util;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.net.SSLUtils;
+
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
@@ -42,25 +50,17 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
-
import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.net.SSLUtils;
import org.jets3t.service.utils.Mimetypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -69,6 +69,8 @@ import java.net.URL;
import java.util.HashMap;
import java.util.Map;
+import scala.Option;
+
import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -81,11 +83,10 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
/**
* A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher.
*
- * More information:
+ * <p>More information:
* http://mesos.apache.org/documentation/latest/fetcher/
* http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
*/
@@ -101,7 +102,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
private final URL baseURL;
- private final Map<Path,URL> paths = new HashMap<>();
+ private final Map<Path, URL> paths = new HashMap<>();
private final SSLContext serverSSLContext;
@@ -153,7 +154,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
};
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
this.bootstrap = new ServerBootstrap();
@@ -169,7 +170,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
String address = bindAddress.getAddress().getHostAddress();
int port = bindAddress.getPort();
- String httpProtocol = (serverSSLContext != null) ? "https": "http";
+ String httpProtocol = (serverSSLContext != null) ? "https" : "http";
baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
@@ -201,7 +202,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
* @param remoteFile the remote path with which to locate the file.
* @return the fully-qualified remote path to the file.
* @throws MalformedURLException if the remote path is invalid.
- */
+ */
public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
return addPath(new Path(localFile.toURI()), new Path(remoteFile));
}
@@ -214,10 +215,10 @@ public class MesosArtifactServer implements MesosArtifactResolver {
* @throws MalformedURLException if the remote path is invalid.
*/
public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
- if(paths.containsKey(remoteFile)) {
+ if (paths.containsKey(remoteFile)) {
throw new IllegalArgumentException("duplicate path registered");
}
- if(remoteFile.isAbsolute()) {
+ if (remoteFile.isAbsolute()) {
throw new IllegalArgumentException("not expecting an absolute path");
}
URL fileURL = new URL(baseURL, remoteFile.toString());
@@ -229,7 +230,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
public synchronized void removePath(Path remoteFile) {
- if(paths.containsKey(remoteFile)) {
+ if (paths.containsKey(remoteFile)) {
URL fileURL = null;
try {
fileURL = new URL(baseURL, remoteFile.toString());
@@ -274,11 +275,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
public VirtualFileServerHandler(Path path) throws IOException {
this.path = path;
- if(!path.isAbsolute()) {
+ if (!path.isAbsolute()) {
throw new IllegalArgumentException("path must be absolute: " + path.toString());
}
this.fs = path.getFileSystem();
- if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+ if (!fs.exists(path) || fs.getFileStatus(path).isDir()) {
throw new IllegalArgumentException("no such file: " + path.toString());
}
}
@@ -292,12 +293,11 @@ public class MesosArtifactServer implements MesosArtifactResolver {
LOG.debug("{} request for file '{}'", request.getMethod(), path);
}
- if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
+ if (!(request.getMethod() == GET || request.getMethod() == HEAD)) {
sendMethodNotAllowed(ctx);
return;
}
-
final FileStatus status;
try {
status = fs.getFileStatus(path);
@@ -322,8 +322,7 @@ public class MesosArtifactServer implements MesosArtifactResolver {
final FSDataInputStream stream = fs.open(path);
try {
ctx.write(new ChunkedStream(stream));
- }
- catch(Exception e) {
+ } catch (Exception e) {
stream.close();
throw e;
}
@@ -369,7 +368,6 @@ public class MesosArtifactServer implements MesosArtifactResolver {
}
}
-
/**
* Handle a request for a non-existent file.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
index 40dc41c..7660e9c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java
@@ -23,10 +23,11 @@ import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.slf4j.Logger;
-import scala.Option;
import java.util.Map;
+import scala.Option;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -53,7 +54,7 @@ public class MesosConfiguration {
/**
* The Mesos connection string.
*
- * The value should be in one of the following forms:
+ * <p>The value should be in one of the following forms:
* <pre>
* {@code
* host:port
@@ -62,28 +63,28 @@ public class MesosConfiguration {
* file:///path/to/file (where file contains one of the above)
* }
* </pre>
- */
+ */
public String masterUrl() {
return masterUrl;
}
/**
* The framework registration info.
- */
+ */
public Protos.FrameworkInfo.Builder frameworkInfo() {
return frameworkInfo;
}
/**
* The credential to authenticate the framework principal.
- */
+ */
public Option<Protos.Credential.Builder> credential() {
return credential;
}
/**
* Revise the configuration with updated framework info.
- */
+ */
public MesosConfiguration withFrameworkInfo(Protos.FrameworkInfo.Builder frameworkInfo) {
return new MesosConfiguration(masterUrl, frameworkInfo, credential);
}
@@ -92,11 +93,11 @@ public class MesosConfiguration {
* Create the Mesos scheduler driver based on this configuration.
* @param scheduler the scheduler to use.
* @param implicitAcknowledgements whether to configure the driver for implicit acknowledgements.
- * @return a scheduler driver.
- */
+ * @return a scheduler driver.
+ */
public SchedulerDriver createDriver(Scheduler scheduler, boolean implicitAcknowledgements) {
MesosSchedulerDriver schedulerDriver;
- if(this.credential().isDefined()) {
+ if (this.credential().isDefined()) {
schedulerDriver =
new MesosSchedulerDriver(scheduler, frameworkInfo.build(), this.masterUrl(),
implicitAcknowledgements, this.credential().get().build());
@@ -119,11 +120,11 @@ public class MesosConfiguration {
}
/**
- * A utility method to log relevant Mesos connection info
- */
+ * A utility method to log relevant Mesos connection info.
+ */
public static void logMesosConfig(Logger log, MesosConfiguration config) {
- Map<String,String> env = System.getenv();
+ Map<String, String> env = System.getenv();
Protos.FrameworkInfo.Builder info = config.frameworkInfo();
log.info("--------------------------------------------------------------------------------");
@@ -137,10 +138,10 @@ public class MesosConfiguration {
log.info(" Role: {}", info.hasRole() ? info.getRole() : "(none)");
log.info(" Principal: {}", info.hasPrincipal() ? info.getPrincipal() : "(none)");
log.info(" Host: {}", info.hasHostname() ? info.getHostname() : "(none)");
- if(env.containsKey("LIBPROCESS_IP")) {
+ if (env.containsKey("LIBPROCESS_IP")) {
log.info(" LIBPROCESS_IP: {}", env.get("LIBPROCESS_IP"));
}
- if(env.containsKey("LIBPROCESS_PORT")) {
+ if (env.containsKey("LIBPROCESS_PORT")) {
log.info(" LIBPROCESS_PORT: {}", env.get("LIBPROCESS_PORT"));
}
log.info(" Web UI: {}", info.hasWebuiUrl() ? info.getWebuiUrl() : "(none)");
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
index 6892a65..4211642 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java
@@ -18,9 +18,13 @@
package org.apache.flink.mesos.util;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.configuration.Configuration;
+import org.apache.curator.framework.CuratorFramework;
+
+/**
+ * Utilities for Zookeeper.
+ */
public class ZooKeeperUtils {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0bca76ed/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7ab4e40..af3f7ef 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -18,29 +18,33 @@
package org.apache.flink.mesos.runtime.clusterframework;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import akka.testkit.TestProbe;
-import com.netflix.fenzo.ConstraintEvaluator;
-import junit.framework.AssertionFailedError;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
-import org.apache.flink.mesos.scheduler.messages.*;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
-import org.apache.flink.runtime.clusterframework.messages.*;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
@@ -50,27 +54,43 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
-import org.apache.mesos.SchedulerDriver;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.testkit.TestProbe;
+import com.netflix.fenzo.ConstraintEvaluator;
+import junit.framework.AssertionFailedError;
import org.apache.mesos.Protos;
import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
-import static java.util.Collections.singletonList;
+import scala.Option;
+import static java.util.Collections.singletonList;
import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractGoalState;
import static org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager.extractResourceID;
-import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* General tests for the Mesos resource manager component.
@@ -125,13 +145,24 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
}
@Override
- protected ActorRef createConnectionMonitor() { return connectionMonitor.ref(); }
+ protected ActorRef createConnectionMonitor() {
+ return connectionMonitor.ref();
+ }
+
@Override
- protected ActorRef createTaskRouter() { return taskRouter.ref(); }
+ protected ActorRef createTaskRouter() {
+ return taskRouter.ref();
+ }
+
@Override
- protected ActorRef createLaunchCoordinator() { return launchCoordinator.ref(); }
+ protected ActorRef createLaunchCoordinator() {
+ return launchCoordinator.ref();
+ }
+
@Override
- protected ActorRef createReconciliationCoordinator() { return reconciliationCoordinator.ref(); }
+ protected ActorRef createReconciliationCoordinator() {
+ return reconciliationCoordinator.ref();
+ }
@Override
protected void fatalError(String message, Throwable error) {
@@ -214,11 +245,11 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
- 1.0,
- MesosTaskManagerParameters.ContainerType.MESOS,
- Option.<String>empty(),
- containeredParams,
- Collections.<Protos.Volume>emptyList(),
+ 1.0,
+ MesosTaskManagerParameters.ContainerType.MESOS,
+ Option.<String>empty(),
+ containeredParams,
+ Collections.<Protos.Volume>emptyList(),
Collections.<ConstraintEvaluator>emptyList(),
Option.<String>empty(),
Option.<String>empty());
@@ -301,8 +332,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
register(Collections.<ResourceID>emptyList());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -331,8 +361,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
assertThat(resourceManagerInstance.workersInLaunch, hasEntry(extractResourceID(task1), worker1launched));
register(singletonList(extractResourceID(task1)));
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -364,8 +393,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
// verify that the internal state was updated
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -409,8 +437,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
// verify that the instance state was updated
assertThat(resourceManagerInstance.workersBeingReturned.entrySet(), empty());
verify(workerStore).removeWorker(task1);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -442,8 +469,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
assertThat(resourceManagerInstance.workersInNew, hasEntry(extractResourceID(task1), expected));
resourceManagerInstance.taskRouter.expectMsgClass(TaskMonitor.TaskGoalStateUpdated.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(LaunchCoordinator.Launch.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -508,8 +534,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.taskRouter.expectMsg(
new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker1launched)));
verify(schedulerDriver).acceptOffers(msg.offerIds(), msg.operations(), msg.filters());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -567,8 +592,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
// verify that the instance state was updated
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
verify(workerStore).newTaskID();
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -604,8 +628,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
assertThat(resourceManagerInstance.workersInLaunch.entrySet(), empty());
expectMsgClass(ResourceRemoved.class);
verify(workerStore).newTaskID();
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -632,8 +655,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
verify(schedulerDriver).stop(false);
verify(workerStore).stop(true);
expectTerminated(resourceManager.actor());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -665,8 +687,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Registered.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(Registered.class);
resourceManagerInstance.taskRouter.expectMsgClass(Registered.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -697,8 +718,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.reconciliationCoordinator.expectMsgClass(ReRegistered.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(ReRegistered.class);
resourceManagerInstance.taskRouter.expectMsgClass(ReRegistered.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -726,8 +746,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
resourceManagerInstance.reconciliationCoordinator.expectMsgClass(Disconnected.class);
resourceManagerInstance.launchCoordinator.expectMsgClass(Disconnected.class);
resourceManagerInstance.taskRouter.expectMsgClass(Disconnected.class);
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@@ -752,8 +771,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
watch(resourceManager.actor());
resourceManager.tell(new Error("test"), resourceManager);
expectTerminated(resourceManager.actor());
- }
- catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException(ex);
}
}