You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/26 14:41:33 UTC

[07/16] flink git commit: [FLINK-8490] Allow custom docker parameters for docker tasks on Mesos

[FLINK-8490] Allow custom docker parameters for docker tasks on Mesos

This closes #5346.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6969fe2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6969fe2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6969fe2f

Branch: refs/heads/master
Commit: 6969fe2fa823be7748cee002a32df02fd1cae09f
Parents: a6d7f2d
Author: Leonid Ishimnikov <li...@users.noreply.github.com>
Authored: Tue Jan 23 13:55:17 2018 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 26 13:50:21 2018 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |  2 +
 docs/ops/deployment/mesos.md                    |  2 +
 .../clusterframework/LaunchableMesosWorker.java |  1 +
 .../clusterframework/MesosResourceManager.java  |  1 +
 .../MesosTaskManagerParameters.java             | 47 ++++++++++++++++++++
 .../MesosFlinkResourceManagerTest.java          |  1 +
 .../MesosResourceManagerTest.java               |  3 +-
 .../MesosTaskManagerParametersTest.java         | 36 +++++++++++++++
 8 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 8a4b1c1..43f281b 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -512,6 +512,8 @@ May be set to -1 to disable this feature.
 
 - `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
 
+- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
+
 - `high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path for persisting the Mesos worker information.
 
 ### High Availability (HA)

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/deployment/mesos.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 5771abe..4d08104 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -264,6 +264,8 @@ May be set to -1 to disable this feature.
 
 `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
 
+`mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
+
 `mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
 
 `mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index e71c703..b4176a8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -298,6 +298,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 				containerInfo
 					.setType(Protos.ContainerInfo.Type.DOCKER)
 					.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+						.addAllParameters(params.dockerParameters())
 						.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
 						.setImage(params.containerImageName().get()));
 				break;

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 8b67257..d76f2fe 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -672,6 +672,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 				1,
 				new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
 			taskManagerParameters.containerVolumes(),
+			taskManagerParameters.dockerParameters(),
 			taskManagerParameters.constraints(),
 			taskManagerParameters.command(),
 			taskManagerParameters.bootstrapCommand(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 3859913..f0ba113 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -86,6 +86,10 @@ public class MesosTaskManagerParameters {
 		key("mesos.resourcemanager.tasks.container.volumes")
 		.noDefaultValue();
 
+	public static final ConfigOption<String> MESOS_RM_CONTAINER_DOCKER_PARAMETERS =
+		key("mesos.resourcemanager.tasks.container.docker.parameters")
+		.noDefaultValue();
+
 	public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
 		key("mesos.constraints.hard.hostattribute")
 		.noDefaultValue();
@@ -109,6 +113,8 @@ public class MesosTaskManagerParameters {
 
 	private final List<Protos.Volume> containerVolumes;
 
+	private final List<Protos.Parameter> dockerParameters;
+
 	private final List<ConstraintEvaluator> constraints;
 
 	private final String command;
@@ -123,6 +129,7 @@ public class MesosTaskManagerParameters {
 			Option<String> containerImageName,
 			ContaineredTaskManagerParameters containeredParameters,
 			List<Protos.Volume> containerVolumes,
+			List<Protos.Parameter> dockerParameters,
 			List<ConstraintEvaluator> constraints,
 			String command,
 			Option<String> bootstrapCommand,
@@ -133,6 +140,7 @@ public class MesosTaskManagerParameters {
 		this.containerImageName = Preconditions.checkNotNull(containerImageName);
 		this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
 		this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
+		this.dockerParameters = Preconditions.checkNotNull(dockerParameters);
 		this.constraints = Preconditions.checkNotNull(constraints);
 		this.command = Preconditions.checkNotNull(command);
 		this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
@@ -177,6 +185,13 @@ public class MesosTaskManagerParameters {
 	}
 
 	/**
+	 * Get Docker runtime parameters.
+	 */
+	public List<Protos.Parameter> dockerParameters() {
+		return dockerParameters;
+	}
+
+	/**
 	 * Get the placement constraints.
 	 */
 	public List<ConstraintEvaluator> constraints() {
@@ -212,6 +227,7 @@ public class MesosTaskManagerParameters {
 			", containerImageName=" + containerImageName +
 			", containeredParameters=" + containeredParameters +
 			", containerVolumes=" + containerVolumes +
+			", dockerParameters=" + dockerParameters +
 			", constraints=" + constraints +
 			", taskManagerHostName=" + taskManagerHostname +
 			", command=" + command +
@@ -260,8 +276,12 @@ public class MesosTaskManagerParameters {
 
 		Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
 
+		Option<String> dockerParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS));
+
 		List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
 
+		List<Protos.Parameter> dockerParameters = buildDockerParameters(dockerParamsOpt);
+
 		//obtain Task Manager Host Name from the configuration
 		Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
 
@@ -275,6 +295,7 @@ public class MesosTaskManagerParameters {
 			Option.apply(imageName),
 			containeredParameters,
 			containerVolumes,
+			dockerParameters,
 			constraints,
 			tmCommand,
 			tmBootstrapCommand,
@@ -365,6 +386,32 @@ public class MesosTaskManagerParameters {
 		}
 	}
 
+	public static List<Protos.Parameter> buildDockerParameters(Option<String> dockerParameters) {
+		if (dockerParameters.isEmpty()) {
+			return Collections.emptyList();
+		} else {
+			String[] dockerParameterSpecifications = dockerParameters.get().split(",");
+
+			List<Protos.Parameter> parameters = new ArrayList<>(dockerParameterSpecifications.length);
+
+			for (String dockerParameterSpecification : dockerParameterSpecifications) {
+				if (!dockerParameterSpecification.trim().isEmpty()) {
+					// split with the limit of 2 in case the value includes '='
+					String[] match = dockerParameterSpecification.split("=", 2);
+					if (match.length != 2) {
+						throw new IllegalArgumentException("Docker parameter specification is invalid, given: "
+							+ dockerParameterSpecification);
+					}
+					Protos.Parameter.Builder parameter = Protos.Parameter.newBuilder();
+					parameter.setKey(match[0]);
+					parameter.setValue(match[1]);
+					parameters.add(parameter.build());
+				}
+			}
+			return parameters;
+		}
+	}
+
 	/**
 	 * The supported containerizers.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index ff32486..c2455ec 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -250,6 +250,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
 				Option.<String>empty(),
 				containeredParams,
 				Collections.<Protos.Volume>emptyList(),
+				Collections.<Protos.Parameter>emptyList(),
 				Collections.<ConstraintEvaluator>emptyList(),
 				"",
 				Option.<String>empty(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 85ba142..14e8ed9 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -263,7 +263,8 @@ public class MesosResourceManagerTest extends TestLogger {
 				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
 				1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
-				Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
+				Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(),
+				Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
 				Option.<String>empty());
 
 			// resource manager

http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index 1f33cb5..84b0ff2 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -85,6 +85,42 @@ public class MesosTaskManagerParametersTest extends TestLogger {
 	}
 
 	@Test
+	public void testContainerDockerParameter() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey=testValue");
+
+		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		assertEquals(params.dockerParameters().size(), 1);
+		assertEquals(params.dockerParameters().get(0).getKey(), "testKey");
+		assertEquals(params.dockerParameters().get(0).getValue(), "testValue");
+	}
+
+	@Test
+	public void testContainerDockerParameters() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
+				"testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\"");
+
+		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+		assertEquals(params.dockerParameters().size(), 4);
+		assertEquals(params.dockerParameters().get(0).getKey(), "testKey1");
+		assertEquals(params.dockerParameters().get(0).getValue(), "testValue1");
+		assertEquals(params.dockerParameters().get(1).getKey(), "testKey2");
+		assertEquals(params.dockerParameters().get(1).getValue(), "testValue2");
+		assertEquals(params.dockerParameters().get(2).getKey(), "testParam3");
+		assertEquals(params.dockerParameters().get(2).getValue(), "key3=value3");
+		assertEquals(params.dockerParameters().get(3).getKey(), "testParam4");
+		assertEquals(params.dockerParameters().get(3).getValue(), "\"key4=value4\"");
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testContainerDockerParametersMalformed() throws Exception {
+		Configuration config = new Configuration();
+		config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "badParam");
+		MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+	}
+
+	@Test
 	public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
 
 		MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));