You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:20 UTC
[15/16] flink git commit: [FLINK-5974] [mesos] Added configurations
to support mesos-dns hostname resolution
[FLINK-5974] [mesos] Added configurations to support mesos-dns hostname resolution
This closes #3692.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e53b75e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e53b75e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e53b75e
Branch: refs/heads/master
Commit: 1e53b75e7df039dd45e7497a353163319ffa6182
Parents: b1f1864
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Thu Apr 6 09:48:39 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:36:10 2017 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 4 ++
.../clusterframework/LaunchableMesosWorker.java | 26 +++++++++++--
.../MesosApplicationMasterRunner.java | 17 +++++++--
.../clusterframework/MesosConfigKeys.java | 10 +++++
.../MesosFlinkResourceManager.java | 3 +-
.../MesosTaskManagerParameters.java | 40 +++++++++++++++++++-
.../MesosFlinkResourceManagerTest.java | 9 ++++-
7 files changed, 98 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 975a3d4..cafef2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1286,6 +1286,10 @@ public final class ConfigConstants {
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
+ public static final String MESOS_RESOURCEMANAGER_TASKS_HOSTNAME = "mesos.resourcemanager.tasks.hostname";
+
+ public static final String MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD = "mesos.resourcemanager.tasks.cmd-prefix";
+
/** Default value to override SSL support for the Artifact Server */
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 2408ac6..3e66a5d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -22,10 +22,12 @@ import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.util.Preconditions;
@@ -64,6 +66,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
private final MesosTaskManagerParameters params;
private final Protos.TaskID taskID;
private final Request taskRequest;
+ private final MesosConfiguration mesosConfiguration;
/**
* Construct a launchable Mesos worker.
@@ -76,11 +79,14 @@ public class LaunchableMesosWorker implements LaunchableTask {
MesosArtifactResolver resolver,
MesosTaskManagerParameters params,
ContainerSpecification containerSpec,
- Protos.TaskID taskID) {
+ Protos.TaskID taskID,
+ MesosConfiguration mesosConfiguration) {
this.resolver = Preconditions.checkNotNull(resolver);
- this.params = Preconditions.checkNotNull(params);
this.containerSpec = Preconditions.checkNotNull(containerSpec);
+ this.params = Preconditions.checkNotNull(params);
this.taskID = Preconditions.checkNotNull(taskID);
+ this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
+
this.taskRequest = new Request();
}
@@ -193,6 +199,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
final StringBuilder jvmArgs = new StringBuilder();
+ //configure task manager hostname property if hostname override property is supplied
+ if(params.getTaskManagerHostname().isDefined()) {
+ final String taskManagerHostName = params.getTaskManagerHostname().get().replace("_TASK",taskID.getValue());
+ dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostName);
+ }
+
// use the assigned ports for the TM
if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
throw new IllegalArgumentException("unsufficient # of ports assigned");
@@ -234,8 +246,16 @@ public class LaunchableMesosWorker implements LaunchableTask {
// finalize JVM args
env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString()));
+ // populate TASK_NAME and FRAMEWORK_NAME environment variables to the TM container
+ env.addVariables(variable(MesosConfigKeys.ENV_TASK_NAME, taskInfo.getTaskId().getValue()));
+ env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
+
// build the launch command w/ dynamic application properties
- StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
+ StringBuilder launchCommand = new StringBuilder();
+ if(params.bootstrapCommand().isDefined()) {
+ launchCommand.append(params.bootstrapCommand().get()).append(" && ");
+ }
+ launchCommand.append("$FLINK_HOME/bin/mesos-taskmanager.sh ");
launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
cmd.setValue(launchCommand.toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5513df4..1cedcc3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -210,10 +210,19 @@ public class MesosApplicationMasterRunner {
try {
// ------- (1) load and parse / validate all configurations -------
- // Note that we use the "appMasterHostname" given by the system, to make sure
- // we use the hostnames consistently throughout akka.
- // for akka "localhost" and "localhost.localdomain" are different actors.
- final String appMasterHostname = InetAddress.getLocalHost().getHostName();
+ final String appMasterHostname;
+ //We will use JM RPC address property if it is supplied through configuration
+ final String jmRpcAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+ if(jmRpcAddress != null) {
+ LOG.info("JM RPC address from Flink configuration file: {} ", jmRpcAddress);
+ appMasterHostname = jmRpcAddress;
+ } else {
+ // Note that we use the "appMasterHostname" given by the system, to make sure
+ // we use the hostnames consistently throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are different actors.
+ appMasterHostname = InetAddress.getLocalHost().getHostName();
+ }
+ LOG.info("App Master Hostname to use: {}", appMasterHostname);
// Mesos configuration
final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index ebd9af5..35da95f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -41,6 +41,16 @@ public class MesosConfigKeys {
*/
public static final String ENV_JVM_ARGS = "JVM_ARGS";
+ /**
+ * Standard environment variables used in DCOS environment
+ */
+ public static final String ENV_TASK_NAME = "TASK_NAME";
+
+ /**
+ * Standard environment variables used in DCOS environment
+ */
+ public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
+
/** Private constructor to prevent instantiation */
private MesosConfigKeys() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index a7321a3..a9ff6cb 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -669,7 +669,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
LaunchableMesosWorker launchable =
- new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID);
+ new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec,
+ taskID, mesosConfig);
return launchable;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index aaab027..b3b8162 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -64,6 +64,14 @@ public class MesosTaskManagerParameters {
key("mesos.resourcemanager.tasks.container.image.name")
.noDefaultValue();
+ public static final ConfigOption<String> MESOS_TM_HOSTNAME =
+ key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_HOSTNAME)
+ .noDefaultValue();
+
+ public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
+ key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD)
+ .noDefaultValue();
+
public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
key("mesos.resourcemanager.tasks.container.volumes")
.noDefaultValue();
@@ -92,6 +100,10 @@ public class MesosTaskManagerParameters {
private final List<Protos.Volume> containerVolumes;
private final List<ConstraintEvaluator> constraints;
+
+ private final Option<String> bootstrapCommand;
+
+ private final Option<String> taskManagerHostname;
public MesosTaskManagerParameters(
double cpus,
@@ -99,7 +111,9 @@ public class MesosTaskManagerParameters {
Option<String> containerImageName,
ContaineredTaskManagerParameters containeredParameters,
List<Protos.Volume> containerVolumes,
- List<ConstraintEvaluator> constraints) {
+ List<ConstraintEvaluator> constraints,
+ Option<String> bootstrapCommand,
+ Option<String> taskManagerHostname) {
this.cpus = cpus;
this.containerType = Preconditions.checkNotNull(containerType);
@@ -107,6 +121,8 @@ public class MesosTaskManagerParameters {
this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
this.constraints = Preconditions.checkNotNull(constraints);
+ this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
+ this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
}
@@ -154,6 +170,16 @@ public class MesosTaskManagerParameters {
return constraints;
}
+ /**
+ * Get the taskManager hostname.
+ */
+ public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
+
+ /**
+ * Get the bootstrap command.
+ */
+ public Option<String> bootstrapCommand() { return bootstrapCommand; }
+
@Override
public String toString() {
return "MesosTaskManagerParameters{" +
@@ -163,6 +189,8 @@ public class MesosTaskManagerParameters {
", containeredParameters=" + containeredParameters +
", containerVolumes=" + containerVolumes +
", constraints=" + constraints +
+ ", taskManagerHostName=" + taskManagerHostname +
+ ", bootstrapCommand=" + bootstrapCommand +
'}';
}
@@ -208,13 +236,21 @@ public class MesosTaskManagerParameters {
List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
+ //obtain Task Manager Host Name from the configuration
+ Option<String> tmHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
+
+ //obtain bootstrap command from the configuration
+ Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
+
return new MesosTaskManagerParameters(
cpus,
containerType,
Option.apply(imageName),
containeredParameters,
containerVolumes,
- constraints);
+ constraints,
+ tmBootstrapCommand,
+ tmHostname);
}
private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7fe5db5..7ab4e40 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -214,7 +214,14 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
- 1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList());
+ 1.0,
+ MesosTaskManagerParameters.ContainerType.MESOS,
+ Option.<String>empty(),
+ containeredParams,
+ Collections.<Protos.Volume>emptyList(),
+ Collections.<ConstraintEvaluator>emptyList(),
+ Option.<String>empty(),
+ Option.<String>empty());
TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(