You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:23:09 UTC

[50/52] [abbrv] flink git commit: [FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner

[FLINK-4928] [yarn] Implement FLIP-6 YARN Application Master Runner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e28b116c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e28b116c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e28b116c

Branch: refs/heads/master
Commit: e28b116cb59f53e2a4ec263789a01114f9c393f0
Parents: cbfb807
Author: shuai.xus <sh...@alibaba-inc.com>
Authored: Thu Nov 3 16:24:47 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:27 2016 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../apache/flink/yarn/YarnResourceManager.java  | 552 +++++++++++++++++++
 2 files changed, 553 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e28b116c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 76b4a86..3bcbfda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -637,7 +637,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 *
 	 * @param t The exception describing the fatal error
 	 */
-	void onFatalErrorAsync(final Throwable t) {
+	protected void onFatalErrorAsync(final Throwable t) {
 		runAsync(new Runnable() {
 			@Override
 			public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e28b116c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
new file mode 100644
index 0000000..6280bdf
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
+/**
+ * The yarn implementation of the resource manager. Used when the system is started
+ * via the resource framework YARN.
+ */
+public class YarnResourceManager extends ResourceManager<ResourceID> implements AMRMClientAsync.CallbackHandler {
+	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+	/** The process environment variables */
+	private final Map<String, String> ENV;
+
+	/** The heartbeat interval while the resource master is waiting for containers */
+	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+	/** The default heartbeat interval during regular operation */
+	private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+	/** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */
+	private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
+
+	/** Environment variable name of the final container id used by the YarnResourceManager.
+	 * Container ID generation may vary across Hadoop versions. */
+	final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+	
+	/** Environment variable name of the hostname used by the Yarn.
+	 * TaskExecutor use this host name to start port. */
+	final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+	/** Default heartbeat interval between this resource manager and the YARN ResourceManager */
+	private final int yarnHeartbeatIntervalMillis;
+
+	private final Configuration flinkConfig;
+
+	private final YarnConfiguration yarnConfig;
+
+	/** Client to communicate with the Resource Manager (YARN's master) */
+	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
+
+	/** Client to communicate with the Node manager and launch TaskExecutor processes */
+	private NMClient nodeManagerClient;
+
+	/** The number of containers requested, but not yet granted */
+	private int numPendingContainerRequests;
+
+	public YarnResourceManager(
+			Configuration flinkConfig,
+			Map<String, String> env,
+			RpcService rpcService,
+			ResourceManagerConfiguration resourceManagerConfiguration,
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManagerFactory slotManagerFactory,
+			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
+			FatalErrorHandler fatalErrorHandler) {
+		super(
+			rpcService,
+			resourceManagerConfiguration,
+			highAvailabilityServices,
+			slotManagerFactory,
+			metricRegistry,
+			jobLeaderIdService,
+			fatalErrorHandler);
+		this.flinkConfig  = flinkConfig;
+		this.yarnConfig = new YarnConfiguration();
+		this.ENV = env;
+		final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
+				ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, DEFAULT_YARN_HEARTBEAT_INTERVAL_MS / 1000) * 1000;
+
+		final long yarnExpiryIntervalMS = yarnConfig.getLong(
+				YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+				YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+
+		if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
+			log.warn("The heartbeat interval of the Flink Application master ({}) is greater " +
+					"than YARN's expiry interval ({}). The application is likely to be killed by YARN.",
+					yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
+		}
+		yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+		numPendingContainerRequests = 0;
+	}
+
+	@Override
+	protected void initialize() throws ResourceManagerException {
+		resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(yarnHeartbeatIntervalMillis, this);
+		resourceManagerClient.init(yarnConfig);
+		resourceManagerClient.start();
+		try {
+			//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
+			Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
+			//TODO: the third paramter should be the webmonitor address
+			resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, getAddress());
+		} catch (Exception e) {
+			LOG.info("registerApplicationMaster fail", e);
+		}
+
+		// create the client to communicate with the node managers
+		nodeManagerClient = NMClient.createNMClient();
+		nodeManagerClient.init(yarnConfig);
+		nodeManagerClient.start();
+		nodeManagerClient.cleanupRunningContainersOnStop(true);
+	}
+
+	@Override
+	public void shutDown() throws Exception {
+		// shut down all components
+		if (resourceManagerClient != null) {
+			try {
+				resourceManagerClient.stop();
+			} catch (Throwable t) {
+				LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t);
+			}
+		}
+		if (nodeManagerClient != null) {
+			try {
+				nodeManagerClient.stop();
+			} catch (Throwable t) {
+				LOG.error("Could not cleanly shut down the Node Manager Client", t);
+			}
+		}
+		super.shutDown();
+	}
+
+	@Override
+	protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+		// first, de-register from YARN
+		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+		LOG.info("Unregistering application from the YARN Resource Manager");
+		try {
+			resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
+		} catch (Throwable t) {
+			LOG.error("Could not unregister the application master.", t);
+		}
+	}
+
+	@Override
+	public void startNewWorker(ResourceProfile resourceProfile) {
+		// Priority for worker containers - priorities are intra-application
+		//TODO: set priority according to the resource allocated
+		Priority priority = Priority.newInstance(0);
+		int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE;
+		if (mem < 0) {
+			mem = 1024;
+		}
+		int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1;
+		Resource capability = Resource.newInstance(mem , vcore);
+		requestYarnContainer(capability, priority);
+	}
+
+	@Override
+	protected ResourceID workerStarted(ResourceID resourceID) {
+		return resourceID;
+	}
+
+	// AMRMClientAsync CallbackHandler methods
+	@Override
+	public float getProgress() {
+		// Temporarily need not record the total size of asked and allocated containers
+		return 1;
+	}
+
+	@Override
+	public void onContainersCompleted(List<ContainerStatus> list) {
+		for (ContainerStatus container : list) {
+			if (container.getExitStatus() < 0) {
+				notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics());
+				// TODO: notice job master slot fail
+			}
+		}
+	}
+
+	@Override
+	public void onContainersAllocated(List<Container> containers) {
+		for (Container container : containers) {
+			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
+			LOG.info("Received new container: {} - Remaining pending container requests: {}",
+					container.getId(), numPendingContainerRequests);
+			try {
+				/** Context information used to start a TaskExecutor Java process */
+				ContainerLaunchContext taskExecutorLaunchContext =
+						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
+				nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+			}
+			catch (Throwable t) {
+				// failed to launch the container, will release the failed one and ask for a new one
+				LOG.error("Could not start TaskManager in container " + container, t);
+				resourceManagerClient.releaseAssignedContainer(container.getId());
+				requestYarnContainer(container.getResource(), container.getPriority());
+			}
+		}
+		if (numPendingContainerRequests <= 0) {
+			resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+		}
+	}
+
+	@Override
+	public void onShutdownRequest() {
+		// Nothing to do
+	}
+
+	@Override
+	public void onNodesUpdated(List<NodeReport> list) {
+		// We are not interested in node updates
+	}
+
+	@Override
+	public void onError(Throwable error) {
+		onFatalErrorAsync(error);
+	}
+
+	//Utility methods
+	/**
+	 * Converts a Flink application status enum to a YARN application status enum.
+	 * @param status The Flink application status.
+	 * @return The corresponding YARN application status.
+	 */
+	private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
+		if (status == null) {
+			return FinalApplicationStatus.UNDEFINED;
+		}
+		else {
+			switch (status) {
+				case SUCCEEDED:
+					return FinalApplicationStatus.SUCCEEDED;
+				case FAILED:
+					return FinalApplicationStatus.FAILED;
+				case CANCELED:
+					return FinalApplicationStatus.KILLED;
+				default:
+					return FinalApplicationStatus.UNDEFINED;
+			}
+		}
+	}
+
+	// parse the host and port from akka address, 
+	// the akka address is like akka.tcp://flink@100.81.153.180:49712/user/$a
+	private static Tuple2<String, Integer> parseHostPort(String address) {
+		String[] hostPort = address.split("@")[1].split(":");
+		String host = hostPort[0];
+		String port = hostPort[1].split("/")[0];
+		return new Tuple2(host, Integer.valueOf(port));
+	}
+
+	private void requestYarnContainer(Resource resource, Priority priority) {
+		resourceManagerClient.addContainerRequest(
+				new AMRMClient.ContainerRequest(resource, null, null, priority));
+		// make sure we transmit the request fast and receive fast news of granted allocations
+		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+
+		numPendingContainerRequests++;
+		LOG.info("Requesting new TaskManager container pending requests: {}",
+				numPendingContainerRequests);
+	}
+
+	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
+			throws Exception {
+		// init the ContainerLaunchContext
+		final String currDir = ENV.get(ApplicationConstants.Environment.PWD.key());
+
+		final ContaineredTaskManagerParameters taskManagerParameters =
+				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), 1);
+
+		LOG.info("TaskExecutor{} will be started with container size {} MB, JVM heap size {} MB, " +
+				"JVM direct memory limit {} MB",
+				containerId,
+				taskManagerParameters.taskManagerTotalMemoryMB(),
+				taskManagerParameters.taskManagerHeapSizeMB(),
+				taskManagerParameters.taskManagerDirectMemoryLimitMB());
+		final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
+				flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT);
+		LOG.debug("TaskManager configuration: {}", taskManagerConfig);
+
+		ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
+				flinkConfig, yarnConfig, ENV,
+				taskManagerParameters, taskManagerConfig,
+				currDir, YarnTaskExecutorRunner.class, LOG);
+
+		// set a special environment variable to uniquely identify this container
+		taskExecutorLaunchContext.getEnvironment()
+				.put(ENV_FLINK_CONTAINER_ID, containerId);
+		taskExecutorLaunchContext.getEnvironment()
+				.put(ENV_FLINK_NODE_ID, host);
+		return taskExecutorLaunchContext;
+	}
+
+
+	/**
+	 * Creates the launch context, which describes how to bring up a TaskExecutor process in
+	 * an allocated YARN container.
+	 *
+	 * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
+	 * needs (such as JAR file, config file, ...) and all environment variables in a YARN
+	 * container launch context. The launch context then ensures that those resources will be
+	 * copied into the containers transient working directory.
+	 *
+	 * @param flinkConfig
+	 *		 The Flink configuration object.
+	 * @param yarnConfig
+	 *		 The YARN configuration object.
+	 * @param env
+	 *		 The environment variables.
+	 * @param tmParams
+	 *		 The TaskExecutor container memory parameters.
+	 * @param taskManagerConfig
+	 *		 The configuration for the TaskExecutors.
+	 * @param workingDirectory
+	 *		 The current application master container's working directory.
+	 * @param taskManagerMainClass
+	 *		 The class with the main method.
+	 * @param log
+	 *		 The logger.
+	 *
+	 * @return The launch context for the TaskManager processes.
+	 *
+	 * @throws Exception Thrown if teh launch context could not be created, for example if
+	 *				   the resources could not be copied.
+	 */
+	private static ContainerLaunchContext createTaskExecutorContext(
+			Configuration flinkConfig,
+			YarnConfiguration yarnConfig,
+			Map<String, String> env,
+			ContaineredTaskManagerParameters tmParams,
+			Configuration taskManagerConfig,
+			String workingDirectory,
+			Class<?> taskManagerMainClass,
+			Logger log) throws Exception {
+
+		// get and validate all relevant variables
+
+		String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
+		
+		String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+
+		String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+		log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+		log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
+
+		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+		log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+		String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
+		// obtain a handle to the file system used by YARN
+		final org.apache.hadoop.fs.FileSystem yarnFileSystem;
+		try {
+			yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
+		} catch (IOException e) {
+			throw new Exception("Could not access YARN's default file system", e);
+		}
+
+		//register keytab
+		LocalResource keytabResource = null;
+		if(remoteKeytabPath != null) {
+			log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(remoteKeytabPath);
+			Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		LocalResource yarnConfResource = null;
+		LocalResource krb5ConfResource = null;
+		boolean hasKrb5 = false;
+		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+			yarnConfResource = Records.newRecord(LocalResource.class);
+			Path yarnConfPath = new Path(remoteYarnConfPath);
+			Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
+
+			log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+			krb5ConfResource = Records.newRecord(LocalResource.class);
+			Path krb5ConfPath = new Path(remoteKrb5Path);
+			Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
+
+			hasKrb5 = true;
+		}
+
+		// register Flink Jar with remote HDFS
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		{
+			Path remoteJarPath = new Path(remoteFlinkJarPath);
+			Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
+		}
+
+		// register conf with local fs
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		{
+			// write the TaskManager configuration to a local file
+			final File taskManagerConfigFile =
+					new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+			log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+			BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+
+			Utils.setupLocalResource(yarnFileSystem, appId,
+					new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+			log.info("Prepared local resource for modified yaml: {}", flinkConf);
+		}
+
+		Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
+		taskManagerLocalResources.put("flink.jar", flinkJar);
+		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+		//To support Yarn Secure Integration Test Scenario
+		if(yarnConfResource != null && krb5ConfResource != null) {
+			taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
+			taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
+		}
+
+		if(keytabResource != null) {
+			taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
+		}
+
+		// prepare additional files to be shipped
+		for (String pathStr : shipListString.split(",")) {
+			if (!pathStr.isEmpty()) {
+				LocalResource resource = Records.newRecord(LocalResource.class);
+				Path path = new Path(pathStr);
+				Utils.registerLocalResource(yarnFileSystem, path, resource);
+				taskManagerLocalResources.put(path.getName(), resource);
+			}
+		}
+
+		// now that all resources are prepared, we can create the launch context
+
+		log.info("Creating container launch context for TaskManagers");
+
+		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+				flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+				hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
+
+		log.info("Starting TaskManagers with command: " + launchCommand);
+
+		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+		ctx.setCommands(Collections.singletonList(launchCommand));
+		ctx.setLocalResources(taskManagerLocalResources);
+
+		Map<String, String> containerEnv = new HashMap<>();
+		containerEnv.putAll(tmParams.taskManagerEnv());
+
+		// add YARN classpath, etc to the container environment
+		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+		Utils.setupYarnClassPath(yarnConfig, containerEnv);
+
+		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+		}
+
+		ctx.setEnvironment(containerEnv);
+
+		try (DataOutputBuffer dob = new DataOutputBuffer()) {
+			log.debug("Adding security tokens to Task Executor Container launch Context....");
+			UserGroupInformation user = UserGroupInformation.getCurrentUser();
+			Credentials credentials = user.getCredentials();
+			credentials.writeTokenStorageToStream(dob);
+			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+			ctx.setTokens(securityTokens);
+		}
+		catch (Throwable t) {
+			log.error("Getting current user info failed when trying to launch the container", t);
+		}
+
+		return ctx;
+	}
+}