You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:20 UTC

[15/16] flink git commit: [FLINK-5974] [mesos] Added configurations to support mesos-dns hostname resolution

[FLINK-5974] [mesos] Added configurations to support mesos-dns hostname resolution

This closes #3692.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e53b75e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e53b75e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e53b75e

Branch: refs/heads/master
Commit: 1e53b75e7df039dd45e7497a353163319ffa6182
Parents: b1f1864
Author: Vijay Srinivasaraghavan <vi...@emc.com>
Authored: Thu Apr 6 09:48:39 2017 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:36:10 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  4 ++
 .../clusterframework/LaunchableMesosWorker.java | 26 +++++++++++--
 .../MesosApplicationMasterRunner.java           | 17 +++++++--
 .../clusterframework/MesosConfigKeys.java       | 10 +++++
 .../MesosFlinkResourceManager.java              |  3 +-
 .../MesosTaskManagerParameters.java             | 40 +++++++++++++++++++-
 .../MesosFlinkResourceManagerTest.java          |  9 ++++-
 7 files changed, 98 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 975a3d4..cafef2e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1286,6 +1286,10 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
 
+	public static final String MESOS_RESOURCEMANAGER_TASKS_HOSTNAME = "mesos.resourcemanager.tasks.hostname";
+
+	public static final String MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD = "mesos.resourcemanager.tasks.cmd-prefix";
+
 	/** Default value to override SSL support for the Artifact Server */
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 2408ac6..3e66a5d 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -22,10 +22,12 @@ import com.netflix.fenzo.ConstraintEvaluator;
 import com.netflix.fenzo.TaskAssignmentResult;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.mesos.Utils;
 import org.apache.flink.mesos.scheduler.LaunchableTask;
 import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.util.Preconditions;
@@ -64,6 +66,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	private final MesosTaskManagerParameters params;
 	private final Protos.TaskID taskID;
 	private final Request taskRequest;
+	private final MesosConfiguration mesosConfiguration;
 
 	/**
 	 * Construct a launchable Mesos worker.
@@ -76,11 +79,14 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			MesosArtifactResolver resolver,
 			MesosTaskManagerParameters params,
 			ContainerSpecification containerSpec,
-			Protos.TaskID taskID) {
+			Protos.TaskID taskID,
+			MesosConfiguration mesosConfiguration) {
 		this.resolver = Preconditions.checkNotNull(resolver);
-		this.params = Preconditions.checkNotNull(params);
 		this.containerSpec = Preconditions.checkNotNull(containerSpec);
+		this.params = Preconditions.checkNotNull(params);
 		this.taskID = Preconditions.checkNotNull(taskID);
+		this.mesosConfiguration = Preconditions.checkNotNull(mesosConfiguration);
+		
 		this.taskRequest = new Request();
 	}
 
@@ -193,6 +199,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
 		final StringBuilder jvmArgs = new StringBuilder();
 
+		//configure task manager hostname property if hostname override property is supplied
+		if(params.getTaskManagerHostname().isDefined()) {
+			final String taskManagerHostName = params.getTaskManagerHostname().get().replace("_TASK",taskID.getValue());
+			dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostName);
+		}
+
 		// use the assigned ports for the TM
 		if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
 			throw new IllegalArgumentException("unsufficient # of ports assigned");
@@ -234,8 +246,16 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		// finalize JVM args
 		env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString()));
 
+		// populate TASK_NAME and FRAMEWORK_NAME environment variables to the TM container
+		env.addVariables(variable(MesosConfigKeys.ENV_TASK_NAME, taskInfo.getTaskId().getValue()));
+		env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
+
 		// build the launch command w/ dynamic application properties
-		StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
+		StringBuilder launchCommand = new StringBuilder();
+		if(params.bootstrapCommand().isDefined()) {
+			launchCommand.append(params.bootstrapCommand().get()).append(" && ");
+		}
+		launchCommand.append("$FLINK_HOME/bin/mesos-taskmanager.sh ");
 		launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
 		cmd.setValue(launchCommand.toString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 5513df4..1cedcc3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -210,10 +210,19 @@ public class MesosApplicationMasterRunner {
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
-			// Note that we use the "appMasterHostname" given by the system, to make sure
-			// we use the hostnames consistently throughout akka.
-			// for akka "localhost" and "localhost.localdomain" are different actors.
-			final String appMasterHostname = InetAddress.getLocalHost().getHostName();
+			final String appMasterHostname;
+			//We will use JM RPC address property if it is supplied through configuration
+			final String jmRpcAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+			if(jmRpcAddress != null) {
+				LOG.info("JM RPC address from Flink configuration file: {} ", jmRpcAddress);
+				appMasterHostname = jmRpcAddress;
+			} else {
+				// Note that we use the "appMasterHostname" given by the system, to make sure
+				// we use the hostnames consistently throughout akka.
+				// for akka "localhost" and "localhost.localdomain" are different actors.
+				appMasterHostname = InetAddress.getLocalHost().getHostName();
+			}
+			LOG.info("App Master Hostname to use: {}", appMasterHostname);
 
 			// Mesos configuration
 			final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);

http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index ebd9af5..35da95f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -41,6 +41,16 @@ public class MesosConfigKeys {
 	 */
 	public static final String ENV_JVM_ARGS = "JVM_ARGS";
 
+	/**
+	 * Standard environment variables used in DCOS environment
+	 */
+	public static final String ENV_TASK_NAME = "TASK_NAME";
+
+	/**
+ 	 * Standard environment variables used in DCOS environment
+ 	 */
+	public static final String ENV_FRAMEWORK_NAME = "FRAMEWORK_NAME";
+
 	/** Private constructor to prevent instantiation */
 	private MesosConfigKeys() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index a7321a3..a9ff6cb 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -669,7 +669,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
 		LaunchableMesosWorker launchable =
-			new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID);
+			new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec,
+					taskID, mesosConfig);
 		return launchable;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index aaab027..b3b8162 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -64,6 +64,14 @@ public class MesosTaskManagerParameters {
 		key("mesos.resourcemanager.tasks.container.image.name")
 			.noDefaultValue();
 
+	public static final ConfigOption<String> MESOS_TM_HOSTNAME =
+			key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_HOSTNAME)
+			.noDefaultValue();
+
+	public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
+			key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD)
+			.noDefaultValue();
+	
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
 		key("mesos.resourcemanager.tasks.container.volumes")
 		.noDefaultValue();
@@ -92,6 +100,10 @@ public class MesosTaskManagerParameters {
 	private final List<Protos.Volume> containerVolumes;
 	
 	private final List<ConstraintEvaluator> constraints;
+	
+	private final Option<String> bootstrapCommand;
+
+	private final Option<String> taskManagerHostname;
 
 	public MesosTaskManagerParameters(
 			double cpus,
@@ -99,7 +111,9 @@ public class MesosTaskManagerParameters {
 			Option<String> containerImageName,
 			ContaineredTaskManagerParameters containeredParameters,
 			List<Protos.Volume> containerVolumes,
-			List<ConstraintEvaluator> constraints) {
+			List<ConstraintEvaluator> constraints,
+			Option<String> bootstrapCommand,
+			Option<String> taskManagerHostname) {
 
 		this.cpus = cpus;
 		this.containerType = Preconditions.checkNotNull(containerType);
@@ -107,6 +121,8 @@ public class MesosTaskManagerParameters {
 		this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
 		this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
 		this.constraints = Preconditions.checkNotNull(constraints);
+		this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
+		this.taskManagerHostname = Preconditions.checkNotNull(taskManagerHostname);
 	}
 
 
@@ -154,6 +170,16 @@ public class MesosTaskManagerParameters {
 		return constraints;
 	}
 
+	/**
+ 	 * Get the taskManager hostname.
+ 	 */
+	public Option<String> getTaskManagerHostname() { return taskManagerHostname; }
+
+	/**
+ 	 * Get the bootstrap command.
+ 	 */
+	public Option<String> bootstrapCommand() { return bootstrapCommand;	}	
+
 	@Override
 	public String toString() {
 		return "MesosTaskManagerParameters{" +
@@ -163,6 +189,8 @@ public class MesosTaskManagerParameters {
 			", containeredParameters=" + containeredParameters +
 			", containerVolumes=" + containerVolumes +
 			", constraints=" + constraints +
+			", taskManagerHostName=" + taskManagerHostname +
+			", bootstrapCommand=" + bootstrapCommand +
 			'}';
 	}
 
@@ -208,13 +236,21 @@ public class MesosTaskManagerParameters {
 
 		List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
 
+		//obtain Task Manager Host Name from the configuration
+		Option<String> tmHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
+
+		//obtain bootstrap command from the configuration
+		Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
+
 		return new MesosTaskManagerParameters(
 			cpus,
 			containerType,
 			Option.apply(imageName),
 			containeredParameters,			
 			containerVolumes,
-			constraints);
+			constraints,
+			tmBootstrapCommand,
+			tmHostname);
 	}
 
 	private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1e53b75e/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index 7fe5db5..7ab4e40 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -214,7 +214,14 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 			ContaineredTaskManagerParameters containeredParams =
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
-				1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList());
+				1.0, 
+				MesosTaskManagerParameters.ContainerType.MESOS, 
+				Option.<String>empty(), 
+				containeredParams, 
+				Collections.<Protos.Volume>emptyList(), 
+				Collections.<ConstraintEvaluator>emptyList(),
+				Option.<String>empty(),
+				Option.<String>empty());
 
 			TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
 				TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(