You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/05 11:48:21 UTC

[16/16] flink git commit: [FLINK-5974] [mesos] Make mesos service name configurable for dns lookups

[FLINK-5974] [mesos] Make mesos service name configurable for dns lookups


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7364fff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7364fff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7364fff

Branch: refs/heads/master
Commit: d7364fffbf552aed79e537a7aec3af593cb4e159
Parents: 1e53b75
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 3 16:48:06 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 5 11:36:11 2017 +0200

----------------------------------------------------------------------
 docs/setup/mesos.md                             |  4 +++
 .../flink/configuration/ConfigConstants.java    |  4 ---
 .../clusterframework/LaunchableMesosWorker.java | 27 ++++++++++------
 .../MesosApplicationMasterRunner.java           | 17 +++-------
 .../MesosFlinkResourceManager.java              |  9 ++++--
 .../MesosTaskManagerParameters.java             | 34 +++++++++++---------
 6 files changed, 52 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/docs/setup/mesos.md
----------------------------------------------------------------------
diff --git a/docs/setup/mesos.md b/docs/setup/mesos.md
index 4c72fbc..674aa92 100644
--- a/docs/setup/mesos.md
+++ b/docs/setup/mesos.md
@@ -263,3 +263,7 @@ May be set to -1 to disable this feature.
 `mesos.resourcemanager.tasks.container.image.name`: Image name to use for the container (**NO DEFAULT**)
 
 `mesos.resourcemanager.tasks.container.volumes`: A comma seperated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
+
+`mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
+
+`mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).

http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index cafef2e..975a3d4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -1286,10 +1286,6 @@ public final class ConfigConstants {
 
 	public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
 
-	public static final String MESOS_RESOURCEMANAGER_TASKS_HOSTNAME = "mesos.resourcemanager.tasks.hostname";
-
-	public static final String MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD = "mesos.resourcemanager.tasks.cmd-prefix";
-
 	/** Default value to override SSL support for the Artifact Server */
 	public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 3e66a5d..04a406f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -34,11 +34,13 @@ import org.apache.flink.util.Preconditions;
 import org.apache.mesos.Protos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
 
 import static org.apache.flink.mesos.Utils.variable;
 import static org.apache.flink.mesos.Utils.range;
@@ -200,9 +202,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		final StringBuilder jvmArgs = new StringBuilder();
 
 		//configure task manager hostname property if hostname override property is supplied
-		if(params.getTaskManagerHostname().isDefined()) {
-			final String taskManagerHostName = params.getTaskManagerHostname().get().replace("_TASK",taskID.getValue());
-			dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostName);
+		Option<String> taskManagerHostnameOption = params.getTaskManagerHostname();
+
+		if(taskManagerHostnameOption.isDefined()) {
+			// replace the TASK_ID pattern by the actual task id value of the Mesos task
+			final String taskManagerHostname = MesosTaskManagerParameters.TASK_ID_PATTERN
+				.matcher(taskManagerHostnameOption.get())
+				.replaceAll(Matcher.quoteReplacement(taskID.getValue()));
+
+			dynamicProperties.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, taskManagerHostname);
 		}
 
 		// use the assigned ports for the TM
@@ -251,13 +259,12 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		env.addVariables(variable(MesosConfigKeys.ENV_FRAMEWORK_NAME, mesosConfiguration.frameworkInfo().getName()));
 
 		// build the launch command w/ dynamic application properties
-		StringBuilder launchCommand = new StringBuilder();
-		if(params.bootstrapCommand().isDefined()) {
-			launchCommand.append(params.bootstrapCommand().get()).append(" && ");
-		}
-		launchCommand.append("$FLINK_HOME/bin/mesos-taskmanager.sh ");
-		launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
-		cmd.setValue(launchCommand.toString());
+		Option<String> bootstrapCmdOption = params.bootstrapCommand();
+
+		final String bootstrapCommand = bootstrapCmdOption.isDefined() ? bootstrapCmdOption.get() + " && " : "";
+		final String launchCommand = bootstrapCommand + "$FLINK_HOME/bin/mesos-taskmanager.sh " + ContainerSpecification.formatSystemProperties(dynamicProperties);
+
+		cmd.setValue(launchCommand);
 
 		// build the container info
 		Protos.ContainerInfo.Builder containerInfo = Protos.ContainerInfo.newBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 1cedcc3..09ef380 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
@@ -210,18 +211,10 @@ public class MesosApplicationMasterRunner {
 		try {
 			// ------- (1) load and parse / validate all configurations -------
 
-			final String appMasterHostname;
-			//We will use JM RPC address property if it is supplied through configuration
-			final String jmRpcAddress = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-			if(jmRpcAddress != null) {
-				LOG.info("JM RPC address from Flink configuration file: {} ", jmRpcAddress);
-				appMasterHostname = jmRpcAddress;
-			} else {
-				// Note that we use the "appMasterHostname" given by the system, to make sure
-				// we use the hostnames consistently throughout akka.
-				// for akka "localhost" and "localhost.localdomain" are different actors.
-				appMasterHostname = InetAddress.getLocalHost().getHostName();
-			}
+			final String appMasterHostname = config.getString(
+				JobManagerOptions.ADDRESS,
+				InetAddress.getLocalHost().getHostName());
+
 			LOG.info("App Master Hostname to use: {}", appMasterHostname);
 
 			// Mesos configuration

http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index a9ff6cb..17ffef7 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -669,8 +669,13 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
 
 	private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
 		LaunchableMesosWorker launchable =
-			new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec,
-					taskID, mesosConfig);
+			new LaunchableMesosWorker(
+				artifactResolver,
+				taskManagerParameters,
+				taskManagerContainerSpec,
+				taskID,
+				mesosConfig);
+
 		return launchable;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d7364fff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index b3b8162..4324469 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -33,6 +33,7 @@ import scala.Option;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -44,33 +45,36 @@ import static org.apache.flink.configuration.ConfigOptions.key;
  */
 public class MesosTaskManagerParameters {
 
+	/** Pattern replaced in the {@link #MESOS_TM_HOSTNAME} by the actual task id of the Mesos task */
+	public static final Pattern TASK_ID_PATTERN = Pattern.compile("_TASK_", Pattern.LITERAL);
+
 	public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
-			key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
-			.defaultValue(1);
+		key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
+		.defaultValue(1);
 
 	public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
-			key("mesos.resourcemanager.tasks.mem")
-			.defaultValue(1024);
+		key("mesos.resourcemanager.tasks.mem")
+		.defaultValue(1024);
 
 	public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
-			key("mesos.resourcemanager.tasks.cpus")
-			.defaultValue(0.0);
+		key("mesos.resourcemanager.tasks.cpus")
+		.defaultValue(0.0);
 
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
 		key("mesos.resourcemanager.tasks.container.type")
-			.defaultValue("mesos");
+		.defaultValue("mesos");
 
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
 		key("mesos.resourcemanager.tasks.container.image.name")
-			.noDefaultValue();
+		.noDefaultValue();
 
 	public static final ConfigOption<String> MESOS_TM_HOSTNAME =
-			key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_HOSTNAME)
-			.noDefaultValue();
+		key("mesos.resourcemanager.tasks.hostname")
+		.noDefaultValue();
 
 	public static final ConfigOption<String> MESOS_TM_BOOTSTRAP_CMD =
-			key(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_BOOTSTRAP_CMD)
-			.noDefaultValue();
+		key("mesos.resourcemanager.tasks.bootstrap-cmd")
+		.noDefaultValue();
 	
 	public static final ConfigOption<String> MESOS_RM_CONTAINER_VOLUMES =
 		key("mesos.resourcemanager.tasks.container.volumes")
@@ -78,7 +82,7 @@ public class MesosTaskManagerParameters {
 	
 	public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
 		key("mesos.constraints.hard.hostattribute")
-			.noDefaultValue();
+		.noDefaultValue();
 
 	/**
 	 * Value for {@code MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE} setting. Tells to use the Mesos containerizer.
@@ -237,7 +241,7 @@ public class MesosTaskManagerParameters {
 		List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
 
 		//obtain Task Manager Host Name from the configuration
-		Option<String> tmHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
+		Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
 
 		//obtain bootstrap command from the configuration
 		Option<String> tmBootstrapCommand = Option.apply(flinkConfig.getString(MESOS_TM_BOOTSTRAP_CMD));
@@ -250,7 +254,7 @@ public class MesosTaskManagerParameters {
 			containerVolumes,
 			constraints,
 			tmBootstrapCommand,
-			tmHostname);
+			taskManagerHostname);
 	}
 
 	private static List<ConstraintEvaluator> parseConstraints(String mesosConstraints) {