You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/26 14:41:33 UTC
[07/16] flink git commit: [FLINK-8490] Allow custom docker parameters
for docker tasks on Mesos
[FLINK-8490] Allow custom docker parameters for docker tasks on Mesos
This closes #5346.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6969fe2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6969fe2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6969fe2f
Branch: refs/heads/master
Commit: 6969fe2fa823be7748cee002a32df02fd1cae09f
Parents: a6d7f2d
Author: Leonid Ishimnikov <li...@users.noreply.github.com>
Authored: Tue Jan 23 13:55:17 2018 -0500
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 26 13:50:21 2018 +0100
----------------------------------------------------------------------
docs/ops/config.md | 2 +
docs/ops/deployment/mesos.md | 2 +
.../clusterframework/LaunchableMesosWorker.java | 1 +
.../clusterframework/MesosResourceManager.java | 1 +
.../MesosTaskManagerParameters.java | 47 ++++++++++++++++++++
.../MesosFlinkResourceManagerTest.java | 1 +
.../MesosResourceManagerTest.java | 3 +-
.../MesosTaskManagerParametersTest.java | 36 +++++++++++++++
8 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 8a4b1c1..43f281b 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -512,6 +512,8 @@ May be set to -1 to disable this feature.
- `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
+- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
+
- `high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path for persisting the Mesos worker information.
### High Availability (HA)
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/docs/ops/deployment/mesos.md
----------------------------------------------------------------------
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 5771abe..4d08104 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -264,6 +264,8 @@ May be set to -1 to disable this feature.
`mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)
+`mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)
+
`mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)
`mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index e71c703..b4176a8 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -298,6 +298,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
containerInfo
.setType(Protos.ContainerInfo.Type.DOCKER)
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+ .addAllParameters(params.dockerParameters())
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
.setImage(params.containerImageName().get()));
break;
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 8b67257..d76f2fe 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -672,6 +672,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
1,
new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
taskManagerParameters.containerVolumes(),
+ taskManagerParameters.dockerParameters(),
taskManagerParameters.constraints(),
taskManagerParameters.command(),
taskManagerParameters.bootstrapCommand(),
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 3859913..f0ba113 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -86,6 +86,10 @@ public class MesosTaskManagerParameters {
key("mesos.resourcemanager.tasks.container.volumes")
.noDefaultValue();
+ public static final ConfigOption<String> MESOS_RM_CONTAINER_DOCKER_PARAMETERS =
+ key("mesos.resourcemanager.tasks.container.docker.parameters")
+ .noDefaultValue();
+
public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
key("mesos.constraints.hard.hostattribute")
.noDefaultValue();
@@ -109,6 +113,8 @@ public class MesosTaskManagerParameters {
private final List<Protos.Volume> containerVolumes;
+ private final List<Protos.Parameter> dockerParameters;
+
private final List<ConstraintEvaluator> constraints;
private final String command;
@@ -123,6 +129,7 @@ public class MesosTaskManagerParameters {
Option<String> containerImageName,
ContaineredTaskManagerParameters containeredParameters,
List<Protos.Volume> containerVolumes,
+ List<Protos.Parameter> dockerParameters,
List<ConstraintEvaluator> constraints,
String command,
Option<String> bootstrapCommand,
@@ -133,6 +140,7 @@ public class MesosTaskManagerParameters {
this.containerImageName = Preconditions.checkNotNull(containerImageName);
this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
+ this.dockerParameters = Preconditions.checkNotNull(dockerParameters);
this.constraints = Preconditions.checkNotNull(constraints);
this.command = Preconditions.checkNotNull(command);
this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
@@ -177,6 +185,13 @@ public class MesosTaskManagerParameters {
}
/**
+ * Get Docker runtime parameters.
+ */
+ public List<Protos.Parameter> dockerParameters() {
+ return dockerParameters;
+ }
+
+ /**
* Get the placement constraints.
*/
public List<ConstraintEvaluator> constraints() {
@@ -212,6 +227,7 @@ public class MesosTaskManagerParameters {
", containerImageName=" + containerImageName +
", containeredParameters=" + containeredParameters +
", containerVolumes=" + containerVolumes +
+ ", dockerParameters=" + dockerParameters +
", constraints=" + constraints +
", taskManagerHostName=" + taskManagerHostname +
", command=" + command +
@@ -260,8 +276,12 @@ public class MesosTaskManagerParameters {
Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));
+ Option<String> dockerParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS));
+
List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);
+ List<Protos.Parameter> dockerParameters = buildDockerParameters(dockerParamsOpt);
+
//obtain Task Manager Host Name from the configuration
Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));
@@ -275,6 +295,7 @@ public class MesosTaskManagerParameters {
Option.apply(imageName),
containeredParameters,
containerVolumes,
+ dockerParameters,
constraints,
tmCommand,
tmBootstrapCommand,
@@ -365,6 +386,32 @@ public class MesosTaskManagerParameters {
}
}
+ public static List<Protos.Parameter> buildDockerParameters(Option<String> dockerParameters) {
+ if (dockerParameters.isEmpty()) {
+ return Collections.emptyList();
+ } else {
+ String[] dockerParameterSpecifications = dockerParameters.get().split(",");
+
+ List<Protos.Parameter> parameters = new ArrayList<>(dockerParameterSpecifications.length);
+
+ for (String dockerParameterSpecification : dockerParameterSpecifications) {
+ if (!dockerParameterSpecification.trim().isEmpty()) {
+ // split with the limit of 2 in case the value includes '='
+ String[] match = dockerParameterSpecification.split("=", 2);
+ if (match.length != 2) {
+ throw new IllegalArgumentException("Docker parameter specification is invalid, given: "
+ + dockerParameterSpecification);
+ }
+ Protos.Parameter.Builder parameter = Protos.Parameter.newBuilder();
+ parameter.setKey(match[0]);
+ parameter.setValue(match[1]);
+ parameters.add(parameter.build());
+ }
+ }
+ return parameters;
+ }
+ }
+
/**
* The supported containerizers.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index ff32486..c2455ec 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -250,6 +250,7 @@ public class MesosFlinkResourceManagerTest extends TestLogger {
Option.<String>empty(),
containeredParams,
Collections.<Protos.Volume>emptyList(),
+ Collections.<Protos.Parameter>emptyList(),
Collections.<ConstraintEvaluator>emptyList(),
"",
Option.<String>empty(),
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 85ba142..14e8ed9 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -263,7 +263,8 @@ public class MesosResourceManagerTest extends TestLogger {
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
- Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
+ Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(),
+ Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
Option.<String>empty());
// resource manager
http://git-wip-us.apache.org/repos/asf/flink/blob/6969fe2f/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
index 1f33cb5..84b0ff2 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java
@@ -85,6 +85,42 @@ public class MesosTaskManagerParametersTest extends TestLogger {
}
@Test
+ public void testContainerDockerParameter() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey=testValue");
+
+ MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+ assertEquals(params.dockerParameters().size(), 1);
+ assertEquals(params.dockerParameters().get(0).getKey(), "testKey");
+ assertEquals(params.dockerParameters().get(0).getValue(), "testValue");
+ }
+
+ @Test
+ public void testContainerDockerParameters() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
+ "testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\"");
+
+ MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+ assertEquals(params.dockerParameters().size(), 4);
+ assertEquals(params.dockerParameters().get(0).getKey(), "testKey1");
+ assertEquals(params.dockerParameters().get(0).getValue(), "testValue1");
+ assertEquals(params.dockerParameters().get(1).getKey(), "testKey2");
+ assertEquals(params.dockerParameters().get(1).getValue(), "testValue2");
+ assertEquals(params.dockerParameters().get(2).getKey(), "testParam3");
+ assertEquals(params.dockerParameters().get(2).getValue(), "key3=value3");
+ assertEquals(params.dockerParameters().get(3).getKey(), "testParam4");
+ assertEquals(params.dockerParameters().get(3).getValue(), "\"key4=value4\"");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testContainerDockerParametersMalformed() throws Exception {
+ Configuration config = new Configuration();
+ config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "badParam");
+ MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
+ }
+
+ @Test
public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {
MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withHardHostAttrConstraintConfiguration("cluster:foo,az:eu-west-1"));