You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/01/18 11:16:37 UTC

flink git commit: [FLINK-3150] Make YARN container invocation configurable

Repository: flink
Updated Branches:
  refs/heads/master 6fb6967b9 -> 8f4139a42


[FLINK-3150] Make YARN container invocation configurable

This closes #3056


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f4139a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f4139a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f4139a4

Branch: refs/heads/master
Commit: 8f4139a42a619738a4efa45ea0b5d20009718aba
Parents: 6fb6967
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jan 2 16:17:09 2017 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Jan 18 11:17:02 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   5 +
 .../flink/configuration/ConfigConstants.java    |  12 ++
 .../clusterframework/BootstrapTools.java        |  95 +++++++++---
 .../clusterframework/BootstrapToolsTest.java    | 148 ++++++++++++++++++
 .../yarn/AbstractYarnClusterDescriptor.java     |  46 ++++--
 .../flink/yarn/YarnClusterDescriptorTest.java   | 149 +++++++++++++++++++
 6 files changed, 416 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f4139a4/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 1b2be8a..1a72e27 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -425,6 +425,11 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use
 
 - `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes.
 
+- `yarn.container-start-command-template`: Flink uses the following template when starting on YARN:
+`%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%`. This configuration parameter allows users
+to pass custom settings (such as JVM paths, arguments etc.). Note that in most cases, it is sufficient to
+use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String.
+
 - `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) With this configuration option, users can specify a port, a range of ports or a list of ports for the  Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the  same physical host, fixed port assignments prevent the AM from starting.
 
   For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4139a4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index fc389e0..14ba9dd 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -423,6 +423,12 @@ public final class ConfigConstants {
 	 */
 	@Deprecated
 	public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env.";
+
+	/**
+	 * Template for the YARN container start incovation.
+	 */
+	public static final String YARN_CONTAINER_START_COMMAND_TEMPLATE =
+		"yarn.container-start-command-template";
 	
 	 /**
 	 * The config parameter defining the Akka actor system port for the ApplicationMaster and
@@ -1146,6 +1152,12 @@ public final class ConfigConstants {
 	public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
 
 	/**
+	 * Start command template for Flink on YARN containers
+	 */
+	public static final String DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE =
+		"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%";
+
+	/**
 	 * Default port for the application master is 0, which means
 	 * the operating system assigns an ephemeral port
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4139a4/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 2a33c44..1ef2cdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.NetUtils;
 
 import org.slf4j.Logger;
 
+import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
@@ -44,13 +45,16 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.BindException;
 import java.net.ServerSocket;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Tools for starting JobManager and TaskManager processes, including the
  * Actor Systems used to run the JobManager and TaskManager actors.
  */
 public class BootstrapTools {
+	private static final Logger LOG = LoggerFactory.getLogger(BootstrapTools.class);
 
 	/**
 	 * Starts an ActorSystem with the given configuration listening at the address/ports.
@@ -347,38 +351,48 @@ public class BootstrapTools {
 			boolean hasKrb5,
 			Class<?> mainClass) {
 
-		StringBuilder tmCommand = new StringBuilder("$JAVA_HOME/bin/java");
-		tmCommand.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		tmCommand.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		tmCommand.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
-
-		String  javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-		tmCommand.append(' ').append(javaOpts);
+		final Map<String, String> startCommandValues = new HashMap<>();
+		startCommandValues.put("java", "$JAVA_HOME/bin/java");
+		startCommandValues
+			.put("jvmmem", 	"-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
+							"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
+							"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
+		String javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		//applicable only for YarnMiniCluster secure test run
+		//krb5.conf file will be available as local resource in JM/TM container
+		if(hasKrb5) {
+			javaOpts += " -Djava.security.krb5.conf=krb5.conf";
+		}
+		startCommandValues.put("jvmopts", javaOpts);
 
+		String logging = "";
 		if (hasLogback || hasLog4j) {
-			tmCommand.append(" -Dlog.file=").append(logDirectory).append("/taskmanager.log");
+			logging = "-Dlog.file=" + logDirectory + "/taskmanager.log";
 			if (hasLogback) {
-				tmCommand.append(" -Dlogback.configurationFile=file:")
-						.append(configDirectory).append("/logback.xml");
+				logging +=
+					" -Dlogback.configurationFile=file:" + configDirectory +
+						"/logback.xml";
 			}
 			if (hasLog4j) {
-				tmCommand.append(" -Dlog4j.configuration=file:")
-						.append(configDirectory).append("/log4j.properties");
-			}
-
-			//applicable only for YarnMiniCluster secure test run
-			//krb5.conf file will be available as local resource in JM/TM container
-			if(hasKrb5) {
-				tmCommand.append(" -Djava.security.krb5.conf=krb5.conf");
+				logging += " -Dlog4j.configuration=file:" + configDirectory +
+					"/log4j.properties";
 			}
 		}
 
-		tmCommand.append(' ').append(mainClass.getName());
-		tmCommand.append(" --configDir ").append(configDirectory);
-		tmCommand.append(" 1> ").append(logDirectory).append("/taskmanager.out");
-		tmCommand.append(" 2> ").append(logDirectory).append("/taskmanager.err");
+		startCommandValues.put("logging", logging);
+		startCommandValues.put("class", mainClass.getName());
+		startCommandValues.put("redirects",
+			"1> " + logDirectory + "/taskmanager.out " +
+			"2> " + logDirectory + "/taskmanager.err");
+		startCommandValues.put("args", "--configDir " + configDirectory);
 
-		return tmCommand.toString();
+		final String commandTemplate = flinkConfig
+			.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+				ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
+		String startCommand = getStartCommand(commandTemplate, startCommandValues);
+		LOG.debug("TaskManager start command: " + startCommand);
+
+		return startCommand;
 	}
 
 
@@ -386,4 +400,39 @@ public class BootstrapTools {
 
 	/** Private constructor to prevent instantiation */
 	private BootstrapTools() {}
+
+	/**
+	 * Replaces placeholders in the template start command with values from
+	 * <tt>startCommandValues</tt>.
+	 *
+	 * <p>If the default template {@link ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}
+	 * is used, the following keys must be present in the map or the resulting
+	 * command will still contain placeholders:
+	 * <ul>
+	 * <li><tt>java</tt> = path to the Java executable</li>
+	 * <li><tt>jvmmem</tt> = JVM memory limits and tweaks</li>
+	 * <li><tt>jvmopts</tt> = misc options for the Java VM</li>
+	 * <li><tt>logging</tt> = logging-related configuration settings</li>
+	 * <li><tt>class</tt> = main class to execute</li>
+	 * <li><tt>args</tt> = arguments for the main class</li>
+	 * <li><tt>redirects</tt> = output redirects</li>
+	 * </ul>
+	 * </p>
+	 *
+	 * @param template
+	 * 		a template start command with placeholders
+	 * @param startCommandValues
+	 * 		a replacement map <tt>placeholder -&gt; value</tt>
+	 *
+	 * @return the start command with placeholders filled in
+	 */
+	public static String getStartCommand(String template,
+		Map<String, String> startCommandValues) {
+		for (Map.Entry<String, String> variable : startCommandValues
+			.entrySet()) {
+			template = template
+				.replace("%" + variable.getKey() + "%", variable.getValue());
+		}
+		return template;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4139a4/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 9ab40fd..b08e1f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.runtime.clusterframework;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
+import java.util.HashMap;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -106,4 +109,149 @@ public class BootstrapToolsTest {
 			assertFalse(key.startsWith(deprecatedPrefix3));
 		}
 	}
+
+	@Test
+	public void testGetTaskManagerShellCommand() {
+		final Configuration cfg = new Configuration();
+		final ContaineredTaskManagerParameters containeredParams =
+			new ContaineredTaskManagerParameters(1024, 768, 256, 4,
+				new HashMap<String, String>());
+
+		// no logging, with/out krb5
+		final String java = "$JAVA_HOME/bin/java";
+		final String jvmmem = "-Xms768m -Xmx768m -XX:MaxDirectMemorySize=256m";
+		final String jvmOpts = "-Djvm"; // if set
+		final String logfile = "-Dlog.file=./logs/taskmanager.log"; // if set
+		final String logback =
+			"-Dlogback.configurationFile=file:./conf/logback.xml"; // if set
+		final String log4j =
+			"-Dlog4j.configuration=file:./conf/log4j.properties"; // if set
+		final String mainClass =
+			"org.apache.flink.runtime.clusterframework.BootstrapToolsTest";
+		final String args = "--configDir ./conf";
+		final String redirects =
+			"1> ./logs/taskmanager.out 2> ./logs/taskmanager.err";
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + // logging
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					false, false, false, this.getClass()));
+
+		final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 + // jvmOpts
+				" " + // logging
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					false, false, true, this.getClass()));
+
+		// logback only, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + logfile + " " + logback +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, false, false, this.getClass()));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 + // jvmOpts
+				" " + logfile + " " + logback +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, false, true, this.getClass()));
+
+		// log4j, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + logfile + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					false, true, false, this.getClass()));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 + // jvmOpts
+				" " + logfile + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					false, true, true, this.getClass()));
+
+		// logback + log4j, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, false, this.getClass()));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 + // jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, true, this.getClass()));
+
+		// logback + log4j, with/out krb5, different JVM opts
+		cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts +
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, false, this.getClass()));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts + " " + krb5 + // jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, true, this.getClass()));
+
+		// now try some configurations with different yarn.container-start-command-template
+
+		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
+		assertEquals(
+			java + " 1 " + jvmmem +
+				" 2 " + jvmOpts + " " + krb5 + // jvmOpts
+				" 3 " + logfile + " " + logback + " " + log4j +
+				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, true, this.getClass()));
+
+		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+			"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+		assertEquals(
+			java +
+				" " + logfile + " " + logback + " " + log4j +
+				" " + jvmOpts + " " + krb5 + // jvmOpts
+				" " + jvmmem +
+				" " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					true, true, true, this.getClass()));
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4139a4/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 6cf3997..6d54c5e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.hadoop.conf.Configuration;
@@ -1175,37 +1176,50 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
-		final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		String javaOpts =
+			flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+		//applicable only for YarnMiniCluster secure test run
+		//krb5.conf file will be available as local resource in JM/TM container
+		if (hasKrb5) {
+			javaOpts += " -Djava.security.krb5.conf=krb5.conf";
+		}
 
 		// Set up the container launch context for the application master
 		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 
-		String amCommand = "$JAVA_HOME/bin/java"
-			+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration)
-			+ "M " + javaOpts;
+		final  Map<String, String> startCommandValues = new HashMap<>();
+		startCommandValues.put("java", "$JAVA_HOME/bin/java");
+		startCommandValues.put("jvmmem", "-Xmx" +
+			Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) +
+			"m");
+		startCommandValues.put("jvmopts", javaOpts);
+		String logging = "";
 
 		if (hasLogback || hasLog4j) {
-			amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
+			logging = "-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"";
 
 			if(hasLogback) {
-				amCommand += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
+				logging += " -Dlogback.configurationFile=file:" + CONFIG_FILE_LOGBACK_NAME;
 			}
 
 			if(hasLog4j) {
-				amCommand += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
+				logging += " -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME;
 			}
 		}
 
-		//applicable only for YarnMiniCluster secure test run
-		//krb5.conf file will be available as local resource in JM/TM container
-		if(hasKrb5) {
-			amCommand += " -Djava.security.krb5.conf=krb5.conf";
-		}
+		startCommandValues.put("logging", logging);
+		startCommandValues.put("class", getApplicationMasterClass().getName());
+		startCommandValues.put("redirects",
+			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
+			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
+		startCommandValues.put("args", "");
+
+		final String commandTemplate = flinkConfiguration
+			.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+				ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
+		final String amCommand =
+			BootstrapTools.getStartCommand(commandTemplate, startCommandValues);
 
-		amCommand += " " + getApplicationMasterClass().getName() + " "
-			+ " 1>"
-			+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out"
-			+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
 		amContainer.setCommands(Collections.singletonList(amCommand));
 
 		LOG.debug("Application Master start command: " + amCommand);

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4139a4/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 9838e6d..467917e 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -30,6 +32,8 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 
+import static org.junit.Assert.assertEquals;
+
 public class YarnClusterDescriptorTest {
 
 	@Rule
@@ -89,4 +93,149 @@ public class YarnClusterDescriptorTest {
 			Assert.assertTrue(e.getCause() instanceof IllegalConfigurationException);
 		}
 	}
+
+	@Test
+	public void testSetupApplicationMasterContainer() {
+		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
+		final Configuration cfg = new Configuration();
+		clusterDescriptor.setFlinkConfiguration(cfg);
+
+		final String java = "$JAVA_HOME/bin/java";
+		final String jvmmem = "-Xmx424m";
+		final String jvmOpts = "-Djvm"; // if set
+		final String krb5 = "-Djava.security.krb5.conf=krb5.conf";
+		final String logfile =
+			"-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+				"/jobmanager.log\""; // if set
+		final String logback =
+			"-Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME; // if set
+		final String log4j =
+			"-Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME; // if set
+		final String mainClass = clusterDescriptor.getApplicationMasterClass().getName();
+		final String args = "";
+		final String redirects =
+			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
+			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
+
+		// no logging, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + // logging
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(false, false, false)
+				.getCommands().get(0));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 +// jvmOpts
+				" " + // logging
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(false, false, true)
+				.getCommands().get(0));
+
+		// logback only, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + logfile + " " + logback +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, false, false)
+				.getCommands().get(0));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 +// jvmOpts
+				" " + logfile + " " + logback +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, false, true)
+				.getCommands().get(0));
+
+		// log4j, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + logfile + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(false, true, false)
+				.getCommands().get(0));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 +// jvmOpts
+				" " + logfile + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(false, true, true)
+				.getCommands().get(0));
+
+		// logback + log4j, with/out krb5
+		assertEquals(
+			java + " " + jvmmem +
+				" " + // jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, false)
+				.getCommands().get(0));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + " " + krb5 +// jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, true)
+				.getCommands().get(0));
+
+		// logback + log4j, with/out krb5, different JVM opts
+		cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts +
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " "  + args + " "+ redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, false)
+				.getCommands().get(0));
+
+		assertEquals(
+			java + " " + jvmmem +
+				" " + jvmOpts + " " + krb5 +// jvmOpts
+				" " + logfile + " " + logback + " " + log4j +
+				" " + mainClass + " "  + args + " "+ redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, true)
+				.getCommands().get(0));
+
+		// now try some configurations with different yarn.container-start-command-template
+
+		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
+		assertEquals(
+			java + " 1 " + jvmmem +
+				" 2 " + jvmOpts + " " + krb5 + // jvmOpts
+				" 3 " + logfile + " " + logback + " " + log4j +
+				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, true)
+				.getCommands().get(0));
+
+		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+			"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+		assertEquals(
+			java +
+				" " + logfile + " " + logback + " " + log4j +
+				" " + jvmOpts + " " + krb5 + // jvmOpts
+				" " + jvmmem +
+				" " + mainClass + " " + args + " " + redirects,
+			clusterDescriptor
+				.setupApplicationMasterContainer(true, true, true)
+				.getCommands().get(0));
+	}
 }