You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/12/05 23:29:11 UTC
[2/3] flink git commit: [FLINK-5091] Formalize the Mesos AppMaster
environment for docker compatibility
[FLINK-5091] Formalize the Mesos AppMaster environment for docker compatibility
- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
- special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
- of dispatcher, Path-based).
- moved some test code related to overriding the JVM\u2019s env.
- moved the Mesos containerizer config code to the MesosTaskManagerParameters.
This closes #2915.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/230bf17b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/230bf17b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/230bf17b
Branch: refs/heads/master
Commit: 230bf17bac3d76959a5cb6aa73ac685757c51cab
Parents: 3b85f42
Author: wrighe3 <er...@emc.com>
Authored: Thu Dec 1 00:21:28 2016 -0800
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Dec 6 00:29:25 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 15 +
.../configuration/GlobalConfiguration.java | 27 +-
flink-dist/src/main/assemblies/bin.xml | 6 +
.../main/flink-bin/mesos-bin/mesos-appmaster.sh | 51 +++
.../flink-bin/mesos-bin/mesos-taskmanager.sh | 60 +++
.../main/java/org/apache/flink/mesos/Utils.java | 21 +
.../clusterframework/LaunchableMesosWorker.java | 106 ++++-
.../MesosApplicationMasterRunner.java | 456 ++++++-------------
.../clusterframework/MesosConfigKeys.java | 25 +-
.../MesosFlinkResourceManager.java | 45 +-
.../MesosTaskManagerParameters.java | 106 ++++-
.../MesosTaskManagerRunner.java | 73 ++-
.../flink/mesos/util/MesosArtifactResolver.java | 31 ++
.../flink/mesos/util/MesosArtifactServer.java | 146 ++++--
.../MesosFlinkResourceManagerTest.java | 19 +-
.../clusterframework/BootstrapTools.java | 36 ++
.../ContainerSpecification.java | 206 +++++++++
.../overlays/AbstractContainerOverlay.java | 72 +++
.../overlays/CompositeContainerOverlay.java | 49 ++
.../overlays/ContainerOverlay.java | 37 ++
.../overlays/FlinkDistributionOverlay.java | 126 +++++
.../overlays/HadoopConfOverlay.java | 147 ++++++
.../overlays/HadoopUserOverlay.java | 83 ++++
.../overlays/KeytabOverlay.java | 102 +++++
.../overlays/Krb5ConfOverlay.java | 111 +++++
.../overlays/SSLStoreOverlay.java | 124 +++++
.../flink/runtime/security/SecurityUtils.java | 4 +-
.../overlays/ContainerOverlayTestBase.java | 73 +++
.../overlays/FlinkDistributionOverlayTest.java | 117 +++++
.../overlays/HadoopConfOverlayTest.java | 119 +++++
.../overlays/HadoopUserOverlayTest.java | 73 +++
.../overlays/KeytabOverlayTest.java | 71 +++
.../overlays/Krb5ConfOverlayTest.java | 59 +++
.../overlays/SSLStoreOverlayTest.java | 78 ++++
.../flink/core/testutils/CommonTestUtils.java | 39 ++
.../apache/flink/test/util/TestBaseUtils.java | 38 +-
36 files changed, 2450 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 6bc5e2e..a515c33 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -440,6 +440,11 @@ public final class ConfigConstants {
// ------------------------ Mesos Configuration ------------------------
/**
+ * The initial number of Mesos tasks to allocate.
+ */
+ public static final String MESOS_INITIAL_TASKS = "mesos.initial-tasks";
+
+ /**
* The maximum number of failed Mesos tasks before entirely stopping
* the Mesos session / job on Mesos.
*
@@ -484,6 +489,8 @@ public final class ConfigConstants {
public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET = "mesos.resourcemanager.framework.secret";
+ public static final String MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "mesos.resourcemanager.framework.user";
+
/**
* The cpus to acquire from Mesos.
*
@@ -1186,6 +1193,8 @@ public final class ConfigConstants {
public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE = "*";
+ public static final String DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER = "";
+
/** Default value to override SSL support for the Artifact Server */
public static final boolean DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED = true;
@@ -1405,6 +1414,12 @@ public final class ConfigConstants {
/** The environment variable name which contains the location of the lib folder */
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
+ /** The environment variable name which contains the location of the bin directory */
+ public static final String ENV_FLINK_BIN_DIR = "FLINK_BIN_DIR";
+
+ /** The environment variable name which contains the Flink installation root directory */
+ public static final String ENV_FLINK_HOME_DIR = "FLINK_HOME";
+
// -------------------------------- Security -------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index ecfbc72..dca6307 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -39,12 +39,31 @@ public final class GlobalConfiguration {
public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+
// --------------------------------------------------------------------------------------------
private GlobalConfiguration() {}
// --------------------------------------------------------------------------------------------
+ private static Configuration dynamicProperties = null;
+
+ /**
+ * Set the process-wide dynamic properties to be merged with the loaded configuration.
+ */
+ public static void setDynamicProperties(Configuration dynamicProperties) {
+ GlobalConfiguration.dynamicProperties = new Configuration(dynamicProperties);
+ }
+
+ /**
+ * Get the dynamic properties.
+ */
+ public static Configuration getDynamicProperties() {
+ return GlobalConfiguration.dynamicProperties;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
/**
* Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
* empty configuration object if the environment variable is not set. In production this variable is set but
@@ -90,7 +109,13 @@ public final class GlobalConfiguration {
"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
}
- return loadYAMLResource(yamlConfigFile);
+ Configuration conf = loadYAMLResource(yamlConfigFile);
+
+ if(dynamicProperties != null) {
+ conf.addAll(dynamicProperties);
+ }
+
+ return conf;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index b4291d3..901cac9 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -82,6 +82,12 @@ under the License.
<outputDirectory>bin</outputDirectory>
<fileMode>0755</fileMode>
</fileSet>
+ <!-- copy Mesos start scripts -->
+ <fileSet>
+ <directory>src/main/flink-bin/mesos-bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
<!-- copy default configuration -->
<fileSet>
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
new file mode 100755
index 0000000..d65c6b0
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink AppMaster
+constructAppMasterClassPath() {
+
+ while read -d '' -r jarfile ; do
+ if [[ $CC_CLASSPATH = "" ]]; then
+ CC_CLASSPATH="$jarfile";
+ else
+ CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+ fi
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+ echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+
+log=flink-appmaster.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner "$@"
+
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
new file mode 100755
index 0000000..ff03abd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -0,0 +1,60 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+# get Flink config
+. "$bin"/config.sh
+
+# auxilliary function to construct a lightweight classpath for the
+# Flink TaskManager
+constructTaskManagerClassPath() {
+
+ while read -d '' -r jarfile ; do
+ if [[ $CC_CLASSPATH = "" ]]; then
+ CC_CLASSPATH="$jarfile";
+ else
+ CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
+ fi
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+
+ echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
+}
+
+CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+
+log=flink-taskmanager.log
+log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
+
+# Add precomputed memory JVM options
+if [ -z "${FLINK_ENV_JAVA_OPTS_MEM}" ]; then
+ FLINK_ENV_JAVA_OPTS_MEM=""
+fi
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_MEM}"
+
+# Add TaskManager-specific JVM options
+export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_LIB_DIR
+
+$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager "$@"
+
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
index 173ae33..7787e40 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java
@@ -18,7 +18,10 @@
package org.apache.flink.mesos;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.mesos.Protos;
+import scala.Option;
import java.net.URL;
import java.util.Arrays;
@@ -46,6 +49,24 @@ public class Utils {
}
/**
+ * Construct a Mesos URI.
+ */
+ public static Protos.CommandInfo.URI uri(MesosArtifactResolver resolver, ContainerSpecification.Artifact artifact) {
+ Option<URL> url = resolver.resolve(artifact.dest);
+ if(url.isEmpty()) {
+ throw new IllegalArgumentException("Unresolvable artifact: " + artifact.dest);
+ }
+
+ return Protos.CommandInfo.URI.newBuilder()
+ .setValue(url.get().toExternalForm())
+ .setOutputFile(artifact.dest.toString())
+ .setExtract(artifact.extract)
+ .setCache(artifact.cachable)
+ .setExecutable(artifact.executable)
+ .build();
+ }
+
+ /**
* Construct a scalar resource value.
*/
public static Protos.Resource scalar(String name, double value) {
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 5f940b5..c6e51f1 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -23,8 +23,11 @@ import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMTaskFitnessCalculator;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.Utils;
import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.mesos.Protos;
import java.util.Collections;
@@ -38,7 +41,10 @@ import static org.apache.flink.mesos.Utils.ranges;
import static org.apache.flink.mesos.Utils.scalar;
/**
- * Specifies how to launch a Mesos worker.
+ * Implements the launch of a Mesos worker.
+ *
+ * Translates the abstract {@link ContainerSpecification} into a concrete
+ * Mesos-specific {@link org.apache.mesos.Protos.TaskInfo}.
*/
public class LaunchableMesosWorker implements LaunchableTask {
@@ -49,20 +55,24 @@ public class LaunchableMesosWorker implements LaunchableTask {
"taskmanager.rpc.port",
"taskmanager.data.port" };
+ private final MesosArtifactResolver resolver;
+ private final ContainerSpecification containerSpec;
private final MesosTaskManagerParameters params;
- private final Protos.TaskInfo.Builder template;
private final Protos.TaskID taskID;
private final Request taskRequest;
/**
* Construct a launchable Mesos worker.
* @param params the TM parameters such as memory, cpu to acquire.
- * @param template a template for the TaskInfo to be constructed at launch time.
+ * @param containerSpec an abstract container specification for launch time.
* @param taskID the taskID for this worker.
*/
- public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+ public LaunchableMesosWorker(
+ MesosArtifactResolver resolver, MesosTaskManagerParameters params,
+ ContainerSpecification containerSpec, Protos.TaskID taskID) {
+ this.resolver = resolver;
this.params = params;
- this.template = template;
+ this.containerSpec = containerSpec;
this.taskID = taskID;
this.taskRequest = new Request();
}
@@ -157,17 +167,25 @@ public class LaunchableMesosWorker implements LaunchableTask {
@Override
public Protos.TaskInfo launch(Protos.SlaveID slaveId, TaskAssignmentResult assignment) {
+ ContaineredTaskManagerParameters tmParams = params.containeredParameters();
+
final Configuration dynamicProperties = new Configuration();
- // specialize the TaskInfo template with assigned resources, environment variables, etc
- final Protos.TaskInfo.Builder taskInfo = template
- .clone()
+ // incorporate the dynamic properties set by the template
+ dynamicProperties.addAll(containerSpec.getDynamicConfiguration());
+
+ // build a TaskInfo with assigned resources, environment variables, etc
+ final Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder()
.setSlaveId(slaveId)
.setTaskId(taskID)
.setName(taskID.getValue())
.addResources(scalar("cpus", assignment.getRequest().getCPUs()))
.addResources(scalar("mem", assignment.getRequest().getMemory()));
+ final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder();
+ final Protos.Environment.Builder env = cmd.getEnvironmentBuilder();
+ final StringBuilder jvmArgs = new StringBuilder();
+
// use the assigned ports for the TM
if (assignment.getAssignedPorts().size() < TM_PORT_KEYS.length) {
throw new IllegalArgumentException("unsufficient # of ports assigned");
@@ -179,17 +197,69 @@ public class LaunchableMesosWorker implements LaunchableTask {
dynamicProperties.setInteger(key, port);
}
- // finalize environment variables
- final Protos.Environment.Builder environmentBuilder = taskInfo.getCommandBuilder().getEnvironmentBuilder();
+ // ship additional files
+ for(ContainerSpecification.Artifact artifact : containerSpec.getArtifacts()) {
+ cmd.addUris(Utils.uri(resolver, artifact));
+ }
- // propagate the Mesos task ID to the TM
- environmentBuilder
- .addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+ // propagate environment variables
+ for (Map.Entry<String, String> entry : params.containeredParameters().taskManagerEnv().entrySet()) {
+ env.addVariables(variable(entry.getKey(), entry.getValue()));
+ }
+ for (Map.Entry<String, String> entry : containerSpec.getEnvironmentVariables().entrySet()) {
+ env.addVariables(variable(entry.getKey(), entry.getValue()));
+ }
- // propagate the dynamic configuration properties to the TM
- String dynamicPropertiesEncoded = FlinkMesosSessionCli.encodeDynamicProperties(dynamicProperties);
- environmentBuilder
- .addVariables(variable(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded));
+ // propagate the Mesos task ID to the TM
+ env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
+
+ // finalize the memory parameters
+ jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+ jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+ jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+
+ // pass dynamic system properties
+ jvmArgs.append(' ').append(
+ ContainerSpecification.formatSystemProperties(containerSpec.getSystemProperties()));
+
+ // finalize JVM args
+ env.addVariables(variable(MesosConfigKeys.ENV_JVM_ARGS, jvmArgs.toString()));
+
+ // build the launch command w/ dynamic application properties
+ StringBuilder launchCommand = new StringBuilder("$FLINK_HOME/bin/mesos-taskmanager.sh ");
+ launchCommand.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+ cmd.setValue(launchCommand.toString());
+
+ // build the container info
+ Protos.ContainerInfo.Builder containerInfo = null;
+ switch(params.containerType()) {
+ case MESOS:
+ if(params.containerImageName().isDefined()) {
+ containerInfo = Protos.ContainerInfo.newBuilder()
+ .setType(Protos.ContainerInfo.Type.MESOS)
+ .setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
+ .setImage(Protos.Image.newBuilder()
+ .setType(Protos.Image.Type.DOCKER)
+ .setDocker(Protos.Image.Docker.newBuilder()
+ .setName(params.containerImageName().get()))));
+ }
+ break;
+
+ case DOCKER:
+ assert(params.containerImageName().isDefined());
+ containerInfo = Protos.ContainerInfo.newBuilder()
+ .setType(Protos.ContainerInfo.Type.DOCKER)
+ .setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
+ .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
+ .setImage(params.containerImageName().get()));
+ break;
+
+ default:
+ throw new IllegalStateException("unsupported container type");
+ }
+ if(containerInfo != null) {
+ taskInfo.setContainer(containerInfo);
+ }
return taskInfo.build();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index ef58250..4b9bd82 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -22,14 +22,17 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
-
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
@@ -38,14 +41,20 @@ import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.mesos.util.ZooKeeperUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
@@ -53,21 +62,15 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
-
-import org.apache.hadoop.security.UserGroupInformation;
-
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import scala.Option;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.io.File;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
-import java.net.UnknownHostException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -75,9 +78,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.mesos.Utils.uri;
-import static org.apache.flink.mesos.Utils.variable;
-
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -106,6 +106,18 @@ public class MesosApplicationMasterRunner {
private static final int ACTOR_DIED_EXIT_CODE = 32;
// ------------------------------------------------------------------------
+ // Command-line options
+ // ------------------------------------------------------------------------
+
+ private static final Options ALL_OPTIONS;
+
+ static {
+ ALL_OPTIONS =
+ new Options()
+ .addOption(BootstrapTools.newDynamicPropertiesOption());
+ }
+
+ // ------------------------------------------------------------------------
// Program entry point
// ------------------------------------------------------------------------
@@ -126,36 +138,44 @@ public class MesosApplicationMasterRunner {
/**
* The instance entry point for the Mesos AppMaster. Obtains user group
- * information and calls the main work method {@link #runPrivileged(Configuration)} as a
+ * information and calls the main work method {@link #runPrivileged(Configuration,Configuration)} as a
* privileged action.
*
* @param args The command line arguments.
* @return The process exit code.
*/
- protected int run(String[] args) {
+ protected int run(final String[] args) {
try {
LOG.debug("All environment variables: {}", ENV);
- final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
- checkState(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+ // loading all config values here has the advantage that the program fails fast, if any
+ // configuration problem occurs
- // Flink configuration
- final Configuration dynamicProperties =
- FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
- LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(ALL_OPTIONS, args);
- final Configuration configuration = createConfiguration(workingDir, dynamicProperties);
+ final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+ GlobalConfiguration.setDynamicProperties(dynamicProperties);
+ final Configuration config = GlobalConfiguration.loadConfiguration();
- SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
+ // configure the default filesystem
+ try {
+ FileSystem.setDefaultScheme(config);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
+ }
+
+ // configure security
+ SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(config);
sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
SecurityUtils.install(sc);
- LOG.info("Running Flink as user {}", UserGroupInformation.getCurrentUser().getShortUserName());
-
+ // run the actual work in the installed security context
return SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
- public Integer call() {
- return runPrivileged(configuration);
+ public Integer call() throws Exception {
+ return runPrivileged(config, dynamicProperties);
}
});
}
@@ -175,78 +195,38 @@ public class MesosApplicationMasterRunner {
*
* @return The return code for the Java process.
*/
- protected int runPrivileged(Configuration config) {
+ protected int runPrivileged(Configuration config, Configuration dynamicProperties) {
ActorSystem actorSystem = null;
WebMonitor webMonitor = null;
MesosArtifactServer artifactServer = null;
-
- // ------- (1) load and parse / validate all configurations -------
-
- // loading all config values here has the advantage that the program fails fast, if any
- // configuration problem occurs
-
- final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
-
- final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID);
- checkState(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID);
-
- // Note that we use the "appMasterHostname" given by the system, to make sure
- // we use the hostnames consistently throughout akka.
- // for akka "localhost" and "localhost.localdomain" are different actors.
- final String appMasterHostname;
+ ExecutorService futureExecutor = null;
+ ExecutorService ioExecutor = null;
try {
- appMasterHostname = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException uhe) {
- LOG.error("Could not retrieve the local hostname.", uhe);
+ // ------- (1) load and parse / validate all configurations -------
- return INIT_ERROR_EXIT_CODE;
- }
+ // Note that we use the "appMasterHostname" given by the system, to make sure
+ // we use the hostnames consistently throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are different actors.
+ final String appMasterHostname = InetAddress.getLocalHost().getHostName();
- // Mesos configuration
- final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
+ // Mesos configuration
+ final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname);
- int numberProcessors = Hardware.getNumberCPUCores();
+ // JM configuration
+ int numberProcessors = Hardware.getNumberCPUCores();
- final ExecutorService futureExecutor = Executors.newFixedThreadPool(
- numberProcessors,
- new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
+ futureExecutor = Executors.newFixedThreadPool(
+ numberProcessors,
+ new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));
- final ExecutorService ioExecutor = Executors.newFixedThreadPool(
- numberProcessors,
- new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
+ ioExecutor = Executors.newFixedThreadPool(
+ numberProcessors,
+ new NamedThreadFactory("mesos-jobmanager-io-", "-thread-"));
- try {
- // environment values related to TM
- final int taskManagerContainerMemory;
- final int numInitialTaskManagers;
- final int slotsPerTaskManager;
-
- try {
- taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
- } catch (NumberFormatException e) {
- throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : "
- + e.getMessage());
- }
- try {
- numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
- } catch (NumberFormatException e) {
- throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : "
- + e.getMessage());
- }
- try {
- slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
- } catch (NumberFormatException e) {
- throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : "
- + e.getMessage());
- }
-
- final ContaineredTaskManagerParameters containeredParameters =
- ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager);
-
- final MesosTaskManagerParameters taskManagerParameters =
- MesosTaskManagerParameters.create(config, containeredParameters);
+ // TM configuration
+ final MesosTaskManagerParameters taskManagerParameters = MesosTaskManagerParameters.create(config);
LOG.info("TaskManagers will be created with {} task slots",
taskManagerParameters.containeredParameters().numSlots());
@@ -257,7 +237,7 @@ public class MesosApplicationMasterRunner {
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
taskManagerParameters.cpus());
- // JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources)
+ // JM endpoint, which should be explicitly configured based on acquired net resources
final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
checkState(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" +
@@ -279,18 +259,28 @@ public class MesosApplicationMasterRunner {
LOG.debug("Starting Artifact Server");
final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
- artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort, config);
+ final String artifactServerPrefix = UUID.randomUUID().toString();
+ artifactServer = new MesosArtifactServer(artifactServerPrefix, akkaHostname, artifactServerPort, config);
// ----------------- (3) Generate the configuration for the TaskManagers -------------------
+ // generate a container spec which conveys the artifacts/vars needed to launch a TM
+ ContainerSpecification taskManagerContainerSpec = new ContainerSpecification();
+
+ // propagate the AM dynamic configuration to the TM
+ taskManagerContainerSpec.getDynamicConfiguration().addAll(dynamicProperties);
+
+ // propagate newly-generated configuration elements
final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
- config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
- LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+ new Configuration(), akkaHostname, akkaPort, taskManagerParameters.containeredParameters().numSlots(),
+ TASKMANAGER_REGISTRATION_TIMEOUT);
+ taskManagerContainerSpec.getDynamicConfiguration().addAll(taskManagerConfig);
+
+ // apply the overlays
+ applyOverlays(config, taskManagerContainerSpec);
- final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext(
- config, ENV,
- taskManagerParameters, taskManagerConfig,
- workingDir, getTaskManagerClass(), artifactServer, LOG);
+ // configure the artifact server to serve the specified artifacts
+ configureArtifactServer(artifactServer, taskManagerContainerSpec);
// ----------------- (4) start the actors -------------------
@@ -341,8 +331,8 @@ public class MesosApplicationMasterRunner {
workerStore,
leaderRetriever,
taskManagerParameters,
- taskManagerContext,
- numInitialTaskManagers,
+ taskManagerContainerSpec,
+ artifactServer,
LOG);
ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
@@ -389,8 +379,21 @@ public class MesosApplicationMasterRunner {
}
}
- futureExecutor.shutdownNow();
- ioExecutor.shutdownNow();
+ if(futureExecutor != null) {
+ try {
+ futureExecutor.shutdownNow();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down future executor", tt);
+ }
+ }
+
+ if(ioExecutor != null) {
+ try {
+ ioExecutor.shutdownNow();
+ } catch (Throwable tt) {
+ LOG.error("Error shutting down io executor", tt);
+ }
+ }
return INIT_ERROR_EXIT_CODE;
}
@@ -442,35 +445,12 @@ public class MesosApplicationMasterRunner {
return MemoryArchivist.class;
}
- protected Class<? extends TaskManager> getTaskManagerClass() {
- return MesosTaskManager.class;
- }
-
- /**
- *
- * @param baseDirectory
- * @param additional
- *
- * @return The configuration to be used by the TaskManagers.
- */
- private static Configuration createConfiguration(String baseDirectory, Configuration additional) {
- LOG.info("Loading config from directory {}", baseDirectory);
-
- Configuration configuration = GlobalConfiguration.loadConfiguration();
-
- // add dynamic properties to JobManager configuration.
- configuration.addAll(additional);
-
- return configuration;
- }
-
/**
* Loads and validates the ResourceManager Mesos configuration from the given Flink configuration.
*/
public static MesosConfiguration createMesosConfig(Configuration flinkConfig, String hostname) {
Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder()
- .setUser("")
.setHostname(hostname);
Protos.Credential.Builder credential = null;
@@ -494,6 +474,10 @@ public class MesosApplicationMasterRunner {
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE,
ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_ROLE));
+ frameworkInfo.setUser(flinkConfig.getString(
+ ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_USER,
+ ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_FRAMEWORK_USER));
+
if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
frameworkInfo.setPrincipal(flinkConfig.getString(
ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_PRINCIPAL, null));
@@ -501,15 +485,16 @@ public class MesosApplicationMasterRunner {
credential = Protos.Credential.newBuilder();
credential.setPrincipal(frameworkInfo.getPrincipal());
- if(!flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
- throw new IllegalConfigurationException(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET + " must be configured.");
+ // some environments use a side-channel to communicate the secret to Mesos,
+ // and thus don't set the 'secret' configuration setting
+ if(flinkConfig.containsKey(ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET)) {
+ credential.setSecret(flinkConfig.getString(
+ ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
}
- credential.setSecret(flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_FRAMEWORK_SECRET, null));
}
MesosConfiguration mesos =
- new MesosConfiguration(masterUrl, frameworkInfo, Option.apply(credential));
+ new MesosConfiguration(masterUrl, frameworkInfo, scala.Option.apply(credential));
return mesos;
}
@@ -533,203 +518,34 @@ public class MesosApplicationMasterRunner {
}
/**
- * Creates a Mesos task info template, which describes how to bring up a TaskManager process as
- * a Mesos task.
+ * Generate a container specification as a TaskManager template.
*
* <p>This code is extremely Mesos-specific and registers all the artifacts that the TaskManager
- * needs (such as JAR file, config file, ...) and all environment variables in a task info record.
+ * needs (such as JAR file, config file, ...) and all environment variables into a container specification.
* The Mesos fetcher then ensures that those artifacts will be copied into the task's sandbox directory.
* A lightweight HTTP server serves the artifacts to the fetcher.
- *
- * <p>We do this work before we start the ResourceManager actor in order to fail early if
- * any of the operations here fail.
- *
- * @param flinkConfig
- * The Flink configuration object.
- * @param env
- * The environment variables.
- * @param tmParams
- * The TaskManager container memory parameters.
- * @param taskManagerConfig
- * The configuration for the TaskManagers.
- * @param workingDirectory
- * The current application master container's working directory.
- * @param taskManagerMainClass
- * The class with the main method.
- * @param artifactServer
- * The artifact server.
- * @param log
- * The logger.
- *
- * @return The task info template for the TaskManager processes.
- *
- * @throws Exception Thrown if the task info could not be created, for example if
- * the resources could not be copied.
- */
- public static Protos.TaskInfo.Builder createTaskManagerContext(
- Configuration flinkConfig,
- Map<String, String> env,
- MesosTaskManagerParameters tmParams,
- Configuration taskManagerConfig,
- String workingDirectory,
- Class<?> taskManagerMainClass,
- MesosArtifactServer artifactServer,
- Logger log) throws Exception {
-
-
- Protos.TaskInfo.Builder info = Protos.TaskInfo.newBuilder();
- Protos.CommandInfo.Builder cmd = Protos.CommandInfo.newBuilder();
-
- log.info("Setting up artifacts for TaskManagers");
-
- String shipListString = env.get(MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
- checkState(shipListString != null, "Environment variable %s not set", MesosConfigKeys.ENV_CLIENT_SHIP_FILES);
-
- String classPathString = env.get(MesosConfigKeys.ENV_FLINK_CLASSPATH);
- checkState(classPathString != null, "Environment variable %s not set", MesosConfigKeys.ENV_FLINK_CLASSPATH);
-
- // register the Flink jar
- final File flinkJarFile = new File(workingDirectory, "flink.jar");
- cmd.addUris(uri(artifactServer.addFile(flinkJarFile, "flink.jar"), true));
-
- String hadoopConfDir = env.get("HADOOP_CONF_DIR");
- LOG.debug("ENV: hadoopConfDir = {}", hadoopConfDir);
-
- //upload Hadoop configurations to artifact server
- boolean hadoopConf = false;
- if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
- File source = new File(hadoopConfDir);
- if(source.exists() && source.isDirectory()) {
- hadoopConf = true;
- File[] fileList = source.listFiles();
- for(File file: fileList) {
- if(file.getName().equals("core-site.xml") || file.getName().equals("hdfs-site.xml")) {
- LOG.debug("Adding local file: [{}] to artifact server", file);
- File f = new File(hadoopConfDir, file.getName());
- cmd.addUris(uri(artifactServer.addFile(f, file.getName()), true));
- }
- }
- }
- }
-
- //upload keytab to the artifact server
- String keytabFileName = null;
- String keytab = flinkConfig.getString(ConfigConstants.SECURITY_KEYTAB_KEY, null);
- if(keytab != null) {
- File source = new File(keytab);
- if(source.exists()) {
- LOG.debug("Adding keytab file: [{}] to artifact server", source);
- keytabFileName = source.getName();
- cmd.addUris(uri(artifactServer.addFile(source, source.getName()), true));
- }
- }
-
- String principal = flinkConfig.getString(ConfigConstants.SECURITY_PRINCIPAL_KEY, null);
- if(keytabFileName != null && principal != null) {
- //reset the configurations since we will use in-memory reference from within the TM instance
- taskManagerConfig.setString(ConfigConstants.SECURITY_KEYTAB_KEY,"");
- taskManagerConfig.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY,"");
- }
-
- // register the TaskManager configuration
- final File taskManagerConfigFile =
- new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
- LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
- BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
- cmd.addUris(uri(artifactServer.addFile(taskManagerConfigFile, GlobalConfiguration.FLINK_CONF_FILENAME), true));
-
- // prepare additional files to be shipped
- for (String pathStr : shipListString.split(",")) {
- if (!pathStr.isEmpty()) {
- File shipFile = new File(workingDirectory, pathStr);
- cmd.addUris(uri(artifactServer.addFile(shipFile, shipFile.getName()), true));
- }
- }
-
- log.info("Creating task info for TaskManagers");
-
- // build the launch command
- boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
- boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
- boolean hasKrb5 = false;
-
- String launchCommand = BootstrapTools.getTaskManagerShellCommand(
- flinkConfig, tmParams.containeredParameters(), ".", ".",
- hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
- cmd.setValue(launchCommand);
-
- // build the environment variables
- Protos.Environment.Builder envBuilder = Protos.Environment.newBuilder();
- for (Map.Entry<String, String> entry : tmParams.containeredParameters().taskManagerEnv().entrySet()) {
- envBuilder.addVariables(variable(entry.getKey(), entry.getValue()));
- }
- envBuilder.addVariables(variable(MesosConfigKeys.ENV_CLASSPATH, classPathString));
-
- //add hadoop config directory to the environment
- if(hadoopConf) {
- envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_CONF_DIR, "."));
- }
-
- //add keytab and principal to environment
- if(keytabFileName != null && principal != null) {
- envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB, keytabFileName));
- envBuilder.addVariables(variable(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL, principal));
- }
-
- envBuilder.addVariables(variable(MesosConfigKeys.ENV_HADOOP_USER_NAME,
- UserGroupInformation.getCurrentUser().getUserName()));
-
- cmd.setEnvironment(envBuilder);
-
- info.setCommand(cmd);
-
- // Set container for task manager if specified in configs.
- String tmImageName = flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_NAME, "");
-
- if (tmImageName.length() > 0) {
- String taskManagerContainerType = flinkConfig.getString(
- ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
- ConfigConstants.DEFAULT_MESOS_RESOURCEMANAGER_TASKS_CONTAINER_IMAGE_TYPE);
-
- Protos.ContainerInfo.Builder containerInfo;
-
- switch (taskManagerContainerType) {
- case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
- containerInfo = Protos.ContainerInfo.newBuilder()
- .setType(Protos.ContainerInfo.Type.MESOS)
- .setMesos(Protos.ContainerInfo.MesosInfo.newBuilder()
- .setImage(Protos.Image.newBuilder()
- .setType(Protos.Image.Type.DOCKER)
- .setDocker(Protos.Image.Docker.newBuilder()
- .setName(tmImageName))));
- break;
- case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
- containerInfo = Protos.ContainerInfo.newBuilder()
- .setType(Protos.ContainerInfo.Type.DOCKER)
- .setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
- .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
- .setImage(tmImageName));
- break;
- default:
- LOG.warn(
- "Invalid container type '{}' provided for setting {}. Valid values are '{}' or '{}'. " +
- "Starting task managers now without container.",
- taskManagerContainerType,
- ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE,
- ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS,
- ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER);
-
- containerInfo = null;
-
- break;
- }
+ */
+ private static void applyOverlays(
+ Configuration globalConfiguration, ContainerSpecification containerSpec) throws IOException {
+
+ // create the overlays that will produce the specification
+ CompositeContainerOverlay overlay = new CompositeContainerOverlay(
+ FlinkDistributionOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+ HadoopConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+ HadoopUserOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+ KeytabOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+ Krb5ConfOverlay.newBuilder().fromEnvironment(globalConfiguration).build(),
+ SSLStoreOverlay.newBuilder().fromEnvironment(globalConfiguration).build()
+ );
+
+ // apply the overlays
+ overlay.configure(containerSpec);
+ }
- if (containerInfo != null) {
- info.setContainer(containerInfo);
- }
+ private static void configureArtifactServer(MesosArtifactServer server, ContainerSpecification container) throws IOException {
+ // serve the artifacts associated with the container environment
+ for(ContainerSpecification.Artifact artifact : container.getArtifacts()) {
+ server.addPath(artifact.source, artifact.dest);
}
-
- return info;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
index bc6dde4..ebd9af5 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
@@ -26,21 +26,20 @@ public class MesosConfigKeys {
// Environment variable names
// ------------------------------------------------------------------------
- public static final String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
- public static final String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
- public static final String ENV_SLOTS = "_SLOTS";
- public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
- public static final String ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
- public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
+ /**
+ * The Mesos task ID, used by the TM for informational purposes
+ */
public static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+
+ /**
+ * Reserved for future enhancement
+ */
public static final String ENV_FLINK_TMP_DIR = "_FLINK_TMP_DIR";
- public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
- public static final String ENV_CLASSPATH = "CLASSPATH";
- public static final String ENV_MESOS_SANDBOX = "MESOS_SANDBOX";
- public static final String ENV_SESSION_ID = "_CLIENT_SESSION_ID";
- public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
- public static final String ENV_KEYTAB = "_KEYTAB_FILE";
- public static final String ENV_KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
+
+ /**
+ * JVM arguments, used by the JM and TM
+ */
+ public static final String ENV_JVM_ARGS = "JVM_ARGS";
/** Private constructor to prevent instantiation */
private MesosConfigKeys() {}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
index 6b24ee8..a7321a3 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
@@ -27,6 +27,7 @@ import com.netflix.fenzo.functions.Action1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
import org.apache.flink.mesos.scheduler.ConnectionMonitor;
import org.apache.flink.mesos.scheduler.LaunchableTask;
@@ -44,9 +45,11 @@ import org.apache.flink.mesos.scheduler.messages.ReRegistered;
import org.apache.flink.mesos.scheduler.messages.Registered;
import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -76,8 +79,11 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
/** The TaskManager container parameters (like container memory size) */
private final MesosTaskManagerParameters taskManagerParameters;
- /** Context information used to start a TaskManager Java process */
- private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+ /** Container specification for launching a TM */
+ private final ContainerSpecification taskManagerContainerSpec;
+
+ /** Resolver for HTTP artifacts **/
+ private final MesosArtifactResolver artifactResolver;
/** Number of failed Mesos tasks before stopping the application. -1 means infinite. */
private final int maxFailedTasks;
@@ -112,7 +118,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
MesosWorkerStore workerStore,
LeaderRetrievalService leaderRetrievalService,
MesosTaskManagerParameters taskManagerParameters,
- Protos.TaskInfo.Builder taskManagerLaunchContext,
+ ContainerSpecification taskManagerContainerSpec,
+ MesosArtifactResolver artifactResolver,
int maxFailedTasks,
int numInitialTaskManagers) {
@@ -121,9 +128,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
this.mesosConfig = requireNonNull(mesosConfig);
this.workerStore = requireNonNull(workerStore);
+ this.artifactResolver = requireNonNull(artifactResolver);
this.taskManagerParameters = requireNonNull(taskManagerParameters);
- this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext);
+ this.taskManagerContainerSpec = requireNonNull(taskManagerContainerSpec);
this.maxFailedTasks = maxFailedTasks;
this.workersInNew = new HashMap<>();
@@ -661,7 +669,7 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) {
LaunchableMesosWorker launchable =
- new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID);
+ new LaunchableMesosWorker(artifactResolver, taskManagerParameters, taskManagerContainerSpec, taskID);
return launchable;
}
@@ -723,10 +731,10 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
* The Flink configuration object.
* @param taskManagerParameters
* The parameters for launching TaskManager containers.
- * @param taskManagerLaunchContext
- * The parameters for launching the TaskManager processes in the TaskManager containers.
- * @param numInitialTaskManagers
- * The initial number of TaskManagers to allocate.
+ * @param taskManagerContainerSpec
+ * The container specification.
+ * @param artifactResolver
+ * The artifact resolver to locate artifacts
* @param log
* The logger to log to.
*
@@ -738,10 +746,22 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
MesosWorkerStore workerStore,
LeaderRetrievalService leaderRetrievalService,
MesosTaskManagerParameters taskManagerParameters,
- Protos.TaskInfo.Builder taskManagerLaunchContext,
- int numInitialTaskManagers,
+ ContainerSpecification taskManagerContainerSpec,
+ MesosArtifactResolver artifactResolver,
Logger log)
{
+
+ final int numInitialTaskManagers = flinkConfig.getInteger(
+ ConfigConstants.MESOS_INITIAL_TASKS, 0);
+ if (numInitialTaskManagers >= 0) {
+ log.info("Mesos framework to allocate {} initial tasks",
+ numInitialTaskManagers);
+ }
+ else {
+ throw new IllegalConfigurationException("Invalid value for " +
+ ConfigConstants.MESOS_INITIAL_TASKS + ", which must be at least zero.");
+ }
+
final int maxFailedTasks = flinkConfig.getInteger(
ConfigConstants.MESOS_MAX_FAILED_TASKS, numInitialTaskManagers);
if (maxFailedTasks >= 0) {
@@ -755,7 +775,8 @@ public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMe
workerStore,
leaderRetrievalService,
taskManagerParameters,
- taskManagerLaunchContext,
+ taskManagerContainerSpec,
+ artifactResolver,
maxFailedTasks,
numInitialTaskManagers);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1b19d08..7fae58c 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -19,10 +19,14 @@
package org.apache.flink.mesos.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import scala.Option;
import static java.util.Objects.requireNonNull;
+import static org.apache.flink.configuration.ConfigOptions.key;
/**
* This class describes the Mesos-specific parameters for launching a TaskManager process.
@@ -32,13 +36,43 @@ import static java.util.Objects.requireNonNull;
*/
public class MesosTaskManagerParameters {
- private double cpus;
+ public static final ConfigOption<Integer> MESOS_RM_TASKS_SLOTS =
+ key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
+ .defaultValue(1);
- private ContaineredTaskManagerParameters containeredParameters;
+ public static final ConfigOption<Integer> MESOS_RM_TASKS_MEMORY_MB =
+ key("mesos.resourcemanager.tasks.mem")
+ .defaultValue(1024);
- public MesosTaskManagerParameters(double cpus, ContaineredTaskManagerParameters containeredParameters) {
+ public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
+ key("mesos.resourcemanager.tasks.cpus")
+ .defaultValue(0.0);
+
+ public static final ConfigOption<String> MESOS_RM_CONTAINER_TYPE =
+ key("mesos.resourcemanager.tasks.container.type")
+ .defaultValue("mesos");
+
+ public static final ConfigOption<String> MESOS_RM_CONTAINER_IMAGE_NAME =
+ key("mesos.resourcemanager.tasks.container.image.name")
+ .noDefaultValue();
+
+ private final double cpus;
+
+ private final ContainerType containerType;
+
+ private final Option<String> containerImageName;
+
+ private final ContaineredTaskManagerParameters containeredParameters;
+
+ public MesosTaskManagerParameters(
+ double cpus,
+ ContainerType containerType,
+ Option<String> containerImageName,
+ ContaineredTaskManagerParameters containeredParameters) {
requireNonNull(containeredParameters);
this.cpus = cpus;
+ this.containerType = containerType;
+ this.containerImageName = containerImageName;
this.containeredParameters = containeredParameters;
}
@@ -50,6 +84,22 @@ public class MesosTaskManagerParameters {
}
/**
+ * Get the container type (Mesos or Docker). The default is Mesos.
+ *
+ * Mesos provides a facility for a framework to specify which containerizer to use.
+ */
+ public ContainerType containerType() {
+ return containerType;
+ }
+
+ /**
+ * Get the container image name.
+ */
+ public Option<String> containerImageName() {
+ return containerImageName;
+ }
+
+ /**
* Get the common containered parameters.
*/
public ContaineredTaskManagerParameters containeredParameters() {
@@ -60,6 +110,8 @@ public class MesosTaskManagerParameters {
public String toString() {
return "MesosTaskManagerParameters{" +
"cpus=" + cpus +
+ ", containerType=" + containerType +
+ ", containerImageName=" + containerImageName +
", containeredParameters=" + containeredParameters +
'}';
}
@@ -67,15 +119,49 @@ public class MesosTaskManagerParameters {
/**
* Create the Mesos TaskManager parameters.
* @param flinkConfig the TM configuration.
- * @param containeredParameters additional containered parameters.
*/
- public static MesosTaskManagerParameters create(
- Configuration flinkConfig,
- ContaineredTaskManagerParameters containeredParameters) {
+ public static MesosTaskManagerParameters create(Configuration flinkConfig) {
- double cpus = flinkConfig.getDouble(ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CPUS,
- Math.max(containeredParameters.numSlots(), 1.0));
+ // parse the common parameters
+ ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create(
+ flinkConfig,
+ flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
+ flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
+
+ double cpus = flinkConfig.getDouble(MESOS_RM_TASKS_CPUS);
+ if(cpus <= 0.0) {
+ cpus = Math.max(containeredParameters.numSlots(), 1.0);
+ }
+
+ // parse the containerization parameters
+ String imageName = flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);
+
+ ContainerType containerType;
+ String containerTypeString = flinkConfig.getString(MESOS_RM_CONTAINER_TYPE);
+ switch(containerTypeString) {
+ case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_MESOS:
+ containerType = ContainerType.MESOS;
+ break;
+ case ConfigConstants.MESOS_RESOURCEMANAGER_TASKS_CONTAINER_TYPE_DOCKER:
+ containerType = ContainerType.DOCKER;
+ if(imageName == null || imageName.length() == 0) {
+ throw new IllegalConfigurationException(MESOS_RM_CONTAINER_IMAGE_NAME.key() +
+ " must be specified for docker container type");
+ }
+ break;
+ default:
+ throw new IllegalConfigurationException("invalid container type: " + containerTypeString);
+ }
+
+ return new MesosTaskManagerParameters(
+ cpus,
+ containerType,
+ Option.apply(imageName),
+ containeredParameters);
+ }
- return new MesosTaskManagerParameters(cpus, containeredParameters);
+ public enum ContainerType {
+ MESOS,
+ DOCKER
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 5100deb..75b5043 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -18,15 +18,20 @@
package org.apache.flink.mesos.runtime.clusterframework;
-import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -35,7 +40,6 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,24 +51,33 @@ public class MesosTaskManagerRunner {
private static final Logger LOG = LoggerFactory.getLogger(MesosTaskManagerRunner.class);
+ private static final Options ALL_OPTIONS;
+
+ static {
+ ALL_OPTIONS =
+ new Options()
+ .addOption(BootstrapTools.newDynamicPropertiesOption());
+ }
+
/** The process environment variables */
private static final Map<String, String> ENV = System.getenv();
- public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws IOException {
+ public static void runTaskManager(String[] args, final Class<? extends TaskManager> taskManager) throws Exception {
EnvironmentInformation.logEnvironmentInfo(LOG, taskManager.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// try to parse the command line arguments
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
final Configuration configuration;
try {
- configuration = TaskManager.parseArgsAndLoadConfig(args);
-
- // add dynamic properties to TaskManager configuration.
- final Configuration dynamicProperties =
- FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
+ GlobalConfiguration.setDynamicProperties(dynamicProperties);
LOG.debug("Mesos dynamic properties: {}", dynamicProperties);
- configuration.addAll(dynamicProperties);
+
+ configuration = GlobalConfiguration.loadConfiguration();
}
catch (Throwable t) {
LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
@@ -74,7 +87,6 @@ public class MesosTaskManagerRunner {
// read the environment variables
final Map<String, String> envs = System.getenv();
- final String effectiveUsername = envs.get(MesosConfigKeys.ENV_HADOOP_USER_NAME);
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);
// configure local directory
@@ -88,20 +100,12 @@ public class MesosTaskManagerRunner {
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
}
- final String keytab = envs.get(MesosConfigKeys.ENV_KEYTAB);
- LOG.info("Keytab file:{}", keytab);
-
- final String principal = envs.get(MesosConfigKeys.ENV_KEYTAB_PRINCIPAL);
- LOG.info("Keytab principal:{}", principal);
-
- if(keytab != null && keytab.length() != 0) {
- File f = new File(".", keytab);
- if(!f.exists()) {
- LOG.error("Could not locate keytab file:[" + keytab + "]");
- System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
- }
- configuration.setString(ConfigConstants.SECURITY_KEYTAB_KEY, keytab);
- configuration.setString(ConfigConstants.SECURITY_PRINCIPAL_KEY, principal);
+ // configure the default filesystem
+ try {
+ FileSystem.setDefaultScheme(configuration);
+ } catch (IOException e) {
+ throw new IOException("Error while setting the default " +
+ "filesystem scheme from configuration.", e);
}
// tell akka to die in case of an error
@@ -112,23 +116,17 @@ public class MesosTaskManagerRunner {
final ResourceID resourceId = new ResourceID(containerID);
LOG.info("ResourceID assigned for this container: {}", resourceId);
- String hadoopConfDir = envs.get(MesosConfigKeys.ENV_HADOOP_CONF_DIR);
- LOG.info("hadoopConfDir: {}", hadoopConfDir);
-
+ // Run the TM in the security context
SecurityUtils.SecurityConfiguration sc = new SecurityUtils.SecurityConfiguration(configuration);
- if(hadoopConfDir != null && hadoopConfDir.length() != 0) {
- sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
- }
+ sc.setHadoopConfiguration(HadoopUtils.getHadoopConfiguration());
+ SecurityUtils.install(sc);
try {
- SecurityUtils.install(sc);
- LOG.info("Mesos task runs as '{}', setting user to execute Flink TaskManager to '{}'",
- UserGroupInformation.getCurrentUser().getShortUserName(), effectiveUsername);
- SecurityUtils.getInstalledContext().runSecured(new Callable<Object>() {
+ SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
- public Object call() throws Exception {
+ public Integer call() throws Exception {
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, taskManager);
- return null;
+ return 0;
}
});
}
@@ -136,6 +134,5 @@ public class MesosTaskManagerRunner {
LOG.error("Error while starting the TaskManager", t);
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
}
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
new file mode 100644
index 0000000..a6a26dc
--- /dev/null
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactResolver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import org.apache.flink.core.fs.Path;
+import scala.Option;
+
+import java.net.URL;
+
+/**
+ * An interface for resolving artifact URIs.
+ */
+public interface MesosArtifactResolver {
+ Option<URL> resolve(Path remoteFile);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
index fbf61ac..37cb260 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java
@@ -26,7 +26,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.DefaultFileRegion;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
@@ -43,24 +42,32 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.codec.http.router.Router;
+import io.netty.handler.stream.ChunkedStream;
+
+import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
import org.jets3t.service.utils.Mimetypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Option;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
@@ -82,7 +89,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
* http://mesos.apache.org/documentation/latest/fetcher/
* http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
*/
-public class MesosArtifactServer {
+public class MesosArtifactServer implements MesosArtifactResolver {
private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class);
@@ -92,17 +99,19 @@ public class MesosArtifactServer {
private Channel serverChannel;
- private URL baseURL;
+ private final URL baseURL;
+
+ private final Map<Path,URL> paths = new HashMap<>();
private final SSLContext serverSSLContext;
- public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort, Configuration config)
- throws Exception {
+ public MesosArtifactServer(String prefix, String serverHostname, int configuredPort, Configuration config)
+ throws Exception {
if (configuredPort < 0 || configuredPort > 0xFFFF) {
throw new IllegalArgumentException("File server port is invalid: " + configuredPort);
}
- // Config to enable https access to the web-ui
+ // Config to enable https access to the artifact server
boolean enableSSL = config.getBoolean(
ConfigConstants.MESOS_ARTIFACT_SERVER_SSL_ENABLED,
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_SSL_ENABLED) &&
@@ -136,6 +145,7 @@ public class MesosArtifactServer {
ch.pipeline()
.addLast(new HttpServerCodec())
+ .addLast(new ChunkedWriteHandler())
.addLast(handler.name(), handler)
.addLast(new UnknownFileHandler());
}
@@ -159,11 +169,15 @@ public class MesosArtifactServer {
String httpProtocol = (serverSSLContext != null) ? "https": "http";
- baseURL = new URL(httpProtocol, serverHostname, port, "/" + sessionID + "/");
+ baseURL = new URL(httpProtocol, serverHostname, port, "/" + prefix + "/");
LOG.info("Mesos Artifact Server Base URL: {}, listening at {}:{}", baseURL, address, port);
}
+ public URL baseURL() {
+ return baseURL;
+ }
+
/**
* Get the server port on which the artifact server is listening.
*/
@@ -185,13 +199,51 @@ public class MesosArtifactServer {
* @param remoteFile the remote path with which to locate the file.
* @return the fully-qualified remote path to the file.
* @throws MalformedURLException if the remote path is invalid.
+ */
+ public synchronized URL addFile(File localFile, String remoteFile) throws IOException, MalformedURLException {
+ return addPath(new Path(localFile.toURI()), new Path(remoteFile));
+ }
+
+ /**
+ * Adds a path to the artifact server.
+ * @param path the qualified FS path to serve (local, hdfs, etc).
+ * @param remoteFile the remote path with which to locate the file.
+ * @return the fully-qualified remote path to the file.
+ * @throws MalformedURLException if the remote path is invalid.
*/
- public synchronized URL addFile(File localFile, String remoteFile) throws MalformedURLException {
- URL fileURL = new URL(baseURL, remoteFile);
- router.ANY(fileURL.getPath(), new VirtualFileServerHandler(localFile));
+ public synchronized URL addPath(Path path, Path remoteFile) throws IOException, MalformedURLException {
+ if(paths.containsKey(remoteFile)) {
+ throw new IllegalArgumentException("duplicate path registered");
+ }
+ if(remoteFile.isAbsolute()) {
+ throw new IllegalArgumentException("not expecting an absolute path");
+ }
+ URL fileURL = new URL(baseURL, remoteFile.toString());
+ router.ANY(fileURL.getPath(), new VirtualFileServerHandler(path));
+
+ paths.put(remoteFile, fileURL);
+
return fileURL;
}
+ public synchronized void removePath(Path remoteFile) {
+ if(paths.containsKey(remoteFile)) {
+ URL fileURL = null;
+ try {
+ fileURL = new URL(baseURL, remoteFile.toString());
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ router.removePath(fileURL.getPath());
+ }
+ }
+
+ @Override
+ public synchronized Option<URL> resolve(Path remoteFile) {
+ Option<URL> resolved = Option.apply(paths.get(remoteFile));
+ return resolved;
+ }
+
/**
* Stops the artifact server.
* @throws Exception
@@ -215,12 +267,17 @@ public class MesosArtifactServer {
@ChannelHandler.Sharable
public static class VirtualFileServerHandler extends SimpleChannelInboundHandler<Routed> {
- private final File file;
+ private FileSystem fs;
+ private Path path;
- public VirtualFileServerHandler(File file) {
- this.file = file;
- if(!file.exists()) {
- throw new IllegalArgumentException("no such file: " + file.getAbsolutePath());
+ public VirtualFileServerHandler(Path path) throws IOException {
+ this.path = path;
+ if(!path.isAbsolute()) {
+ throw new IllegalArgumentException("path must be absolute: " + path.toString());
+ }
+ this.fs = path.getFileSystem();
+ if(!fs.exists(path) || fs.getFileStatus(path).isDir()) {
+ throw new IllegalArgumentException("no such file: " + path.toString());
}
}
@@ -230,7 +287,7 @@ public class MesosArtifactServer {
HttpRequest request = routed.request();
if (LOG.isDebugEnabled()) {
- LOG.debug("{} request for file '{}'", request.getMethod(), file.getAbsolutePath());
+ LOG.debug("{} request for file '{}'", request.getMethod(), path);
}
if(!(request.getMethod() == GET || request.getMethod() == HEAD)) {
@@ -238,47 +295,40 @@ public class MesosArtifactServer {
return;
}
- final RandomAccessFile raf;
+
+ final FileStatus status;
try {
- raf = new RandomAccessFile(file, "r");
+ status = fs.getFileStatus(path);
}
- catch (FileNotFoundException e) {
+ catch (IOException e) {
+ LOG.error("unable to stat file", e);
sendError(ctx, GONE);
return;
}
- try {
- long fileLength = raf.length();
- // compose the response
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- if (HttpHeaders.isKeepAlive(request)) {
- response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
- }
- HttpHeaders.setHeader(response, CACHE_CONTROL, "private");
- HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM);
- HttpHeaders.setContentLength(response, fileLength);
+ // compose the response
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ HttpHeaders.setHeader(response, CONNECTION, HttpHeaders.Values.CLOSE);
+ HttpHeaders.setHeader(response, CACHE_CONTROL, "private");
+ HttpHeaders.setHeader(response, CONTENT_TYPE, Mimetypes.MIMETYPE_OCTET_STREAM);
+ HttpHeaders.setContentLength(response, status.getLen());
- ctx.write(response);
+ ctx.write(response);
- if (request.getMethod() == GET) {
- // write the content. Netty's DefaultFileRegion will close the file.
- ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+ if (request.getMethod() == GET) {
+ // write the content. Netty will close the stream.
+ final FSDataInputStream stream = fs.open(path);
+ try {
+ ctx.write(new ChunkedStream(stream));
}
- else {
- // close the file immediately in HEAD case
- raf.close();
+ catch(Exception e) {
+ stream.close();
+ throw e;
}
- ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
- // close the connection, if no keep-alive is needed
- if (!HttpHeaders.isKeepAlive(request)) {
- lastContentFuture.addListener(ChannelFutureListener.CLOSE);
- }
- }
- catch(Exception ex) {
- raf.close();
- throw ex;
}
+
+ ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/230bf17b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
index f287e13..93ccf68 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java
@@ -33,10 +33,12 @@ import org.apache.flink.mesos.scheduler.LaunchCoordinator;
import org.apache.flink.mesos.scheduler.TaskMonitor;
import org.apache.flink.mesos.scheduler.messages.*;
import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.util.MesosArtifactResolver;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.messages.*;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -79,6 +81,7 @@ public class MesosFlinkResourceManagerTest {
private static Configuration config = new Configuration() {{
setInteger(ConfigConstants.MESOS_MAX_FAILED_TASKS, -1);
+ setInteger(ConfigConstants.MESOS_INITIAL_TASKS, 0);
}};
@BeforeClass
@@ -107,12 +110,13 @@ public class MesosFlinkResourceManagerTest {
MesosWorkerStore workerStore,
LeaderRetrievalService leaderRetrievalService,
MesosTaskManagerParameters taskManagerParameters,
- Protos.TaskInfo.Builder taskManagerLaunchContext,
+ ContainerSpecification taskManagerContainerSpec,
+ MesosArtifactResolver artifactResolver,
int maxFailedTasks,
int numInitialTaskManagers) {
super(flinkConfig, mesosConfig, workerStore, leaderRetrievalService, taskManagerParameters,
- taskManagerLaunchContext, maxFailedTasks, numInitialTaskManagers);
+ taskManagerContainerSpec, artifactResolver, maxFailedTasks, numInitialTaskManagers);
}
@Override
@@ -141,6 +145,7 @@ public class MesosFlinkResourceManagerTest {
public LeaderRetrievalService retrievalService;
public MesosConfiguration mesosConfig;
public MesosWorkerStore workerStore;
+ public MesosArtifactResolver artifactResolver;
public SchedulerDriver schedulerDriver;
public TestingMesosFlinkResourceManager resourceManagerInstance;
public ActorGateway resourceManager;
@@ -176,6 +181,9 @@ public class MesosFlinkResourceManagerTest {
// worker store
workerStore = mock(MesosWorkerStore.class);
when(workerStore.getFrameworkID()).thenReturn(Option.<Protos.FrameworkID>empty());
+
+ // artifact
+ artifactResolver = mock(MesosArtifactResolver.class);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -185,15 +193,16 @@ public class MesosFlinkResourceManagerTest {
* Initialize the resource manager.
*/
public void initialize() {
+ ContainerSpecification containerSpecification = new ContainerSpecification();
ContaineredTaskManagerParameters containeredParams =
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
- MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(1.0, containeredParams);
- Protos.TaskInfo.Builder taskInfo = Protos.TaskInfo.newBuilder();
+ MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
+ 1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams);
TestActorRef<TestingMesosFlinkResourceManager> resourceManagerRef =
TestActorRef.create(system, MesosFlinkResourceManager.createActorProps(
TestingMesosFlinkResourceManager.class,
- config, mesosConfig, workerStore, retrievalService, tmParams, taskInfo, 0, LOG));
+ config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG));
resourceManagerInstance = resourceManagerRef.underlyingActor();
resourceManager = new AkkaActorGateway(resourceManagerRef, null);