You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:21 UTC
[16/16] flink git commit: [FLINK-5974] [mesos] Make mesos service
name configurable for dns lookups
[FLINK-5974] [mesos] Make mesos service name configurable for dns lookups
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7364fff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7364fff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7364fff
Branch: refs/heads/master
Commit: d7364fffbf552aed79e537a7aec3af593cb4e159
Parents: 1e53b75
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 3 16:48:06 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:36:11 2017 +0200
----------------------------------------------------------------------
docs/setup/mesos.md | 4 +++
.../flink/configuration/ConfigConstants.java | 4 ---
.../clusterframework/LaunchableMesosWorker.java | 27 ++++++++++------
.../MesosApplicationMasterRunner.java | 17 +++-------
.../MesosFlinkResourceManager.java | 9 ++++--
.../MesosTaskManagerParameters.java | 34 +++++++++++---------
6 files changed, 52 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/docs/setup/mesos.md
----------------------------------------------------------------------
diff --git a/docs/setup/mesos.md b/docs/setup/mesos.md
index 4c72fbc..674aa92 100644
--- a/docs/setup/mesos.md
+++ b/docs/setup/mesos.md
@@ -263,3 +263,7 @@ May be set to -1 to disable this feature.
`mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
`mesos.resourcemanager.tasks.container.volumes`: A comma seperated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
+
+`mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
+
+`mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).
http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index cafef2e..975a3d4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1286,10 +1286,6 @@ public final class ConfigConstants {
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
- public static final String MESOS_RESOURCEMANAGER_TASKS_HOSTNAME = "mesos.resourcemanager.tasks.hostname";
-
- public static final String MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD = "mesos.resourcemanager.tasks.cmd-prefix";
-
/** Default value to override SSL support for the Artifact Server */
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 3e66a5d..04a406f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -34,11 +34,13 @@ import org.apache.flink.util.Preconditions;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
import static org.apache.flink.mesos.Utils.variable;
import static org.apache.flink.mesos.Utils.range;
@@ -200,9 +202,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
final StringBuilder jvmArgs = new StringBuilder();
//configure task manager hostname property if hostname override property is supplied
- if(params.getTaskManagerHostname().isDefined()) {
- final String taskManagerHostName = params.getTaskManagerHostname().get().replace("_TASK",taskID.getValue());
- dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostName);
+ Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
+
+ if(taskManagerHostnameOption.isDefined()) {
+ // replace the TASK_ID pattern by the actual task id value of the Mesos task
+ final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
+ .matcher(taskManagerHostnameOption.get())
+ .replaceAll(Matcher.quoteReplacement(taskID.getValue()));
+
+ dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostname);
}
// use the assigned ports for the TM
@@ -251,13 +259,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
// build the launch command w/ dynamic application properties
- StringBuilder launchCommand = new StringBuilder();
- if(params.bootstrapCommand().isDefined()) {
- launchCommand.append(params.bootstrapCommand().get()).append(" && ");
- }
- launchCommand.append("$FLINK_HOME/bin/mesos-taskmanager.sh ");
- launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
- cmd.setValue(launchCommand.toString());
+ Option<String> bootstrapCmdOption = params.bootstrapCommand();
+
+ final String bootstrapCommand = bootstrapCmdOption.isDefined() ? bootstrapCmdOption.get() + " && " : "";
+ final String launchCommand = bootstrapCommand + "$FLINK_HOME/bin/mesos-taskmanager.sh " + ContainerSpecification.formatSystemProperties(dynamicProperties);
+
+ cmd.setValue(launchCommand);
// build the container info
Protos.ContainerInfo.Builder containerInfo = Protos.ContainerInfo.newBuilder();
http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 1cedcc3..09ef380 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
@@ -210,18 +211,10 @@ public class MesosApplicationMasterRunner {
try {
// ------- (1) load and parse / validate all configurations -------
- final String appMasterHostname;
- //We will use JM RPC address property if it is supplied through configuration
- final String jmRpcAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- if(jmRpcAddress != null) {
- LOG.info("JM RPC address from Flink configuration file: {} ", jmRpcAddress);
- appMasterHostname = jmRpcAddress;
- } else {
- // Note that we use the "appMasterHostname" given by the system, to make sure
- // we use the hostnames consistently throughout akka.
- // for akka "localhost" and "localhost.localdomain" are different actors.
- appMasterHostname = InetAddress.getLocalHost().getHostName();
- }
+ final String appMasterHostname = config.getString(
+ JobManagerOptions.ADDRESS,
+ InetAddress.getLocalHost().getHostName());
+
LOG.info("App Master Hostname to use: {}", appMasterHostname);
// Mesos configuration
http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index a9ff6cb..17ffef7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -669,8 +669,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
LaunchableMesosWorker launchable =
- new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec,
- taskID, mesosConfig);
+ new LaunchableMesosWorker(
+ artifactResolver,
+ taskManagerParameters,
+ taskManagerContainerSpec,
+ taskID,
+ mesosConfig);
+
return launchable;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index b3b8162..4324469 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -33,6 +33,7 @@ import scala.Option;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.regex.Pattern;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -44,33 +45,36 @@ import static org.apache.flink.configuration.ConfigOptions.key;
*/
public class MesosTaskManagerParameters {
+ /** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
+ public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
+
public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
- key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
- .defaultValue(1);
+ key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
+ .defaultValue(1);
public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
- key("mesos.resourcemanager.tasks.mem")
- .defaultValue(1024);
+ key("mesos.resourcemanager.tasks.mem")
+ .defaultValue(1024);
public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
- key("mesos.resourcemanager.tasks.cpus")
- .defaultValue(0.0);
+ key("mesos.resourcemanager.tasks.cpus")
+ .defaultValue(0.0);
public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
key("mesos.resourcemanager.tasks.container.type")
- .defaultValue("mesos");
+ .defaultValue("mesos");
public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
key("mesos.resourcemanager.tasks.container.image.name")
- .noDefaultValue();
+ .noDefaultValue();
public static final ConfigOption<String> MESOS_TM_HOSTNAME =
- key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_HOSTNAME)
- .noDefaultValue();
+ key("mesos.resourcemanager.tasks.hostname")
+ .noDefaultValue();
public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
- key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD)
- .noDefaultValue();
+ key("mesos.resourcemanager.tasks.bootstrap-cmd")
+ .noDefaultValue();
public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
key("mesos.resourcemanager.tasks.container.volumes")
@@ -78,7 +82,7 @@ public class MesosTaskManagerParameters {
public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
key("mesos.constraints.hard.hostattribute")
- .noDefaultValue();
+ .noDefaultValue();
/**
* Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer.
@@ -237,7 +241,7 @@ public class MesosTaskManagerParameters {
List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
//obtain Task Manager Host Name from the configuration
- Option<String> tmHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
+ Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
//obtain bootstrap command from the configuration
Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
@@ -250,7 +254,7 @@ public class MesosTaskManagerParameters {
containerVolumes,
constraints,
tmBootstrapCommand,
- tmHostname);
+ taskManagerHostname);
}
private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) {