You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/28 10:17:38 UTC

flink git commit: [FLINK-7125] [yarn] Remove Configuration loading from AbstractYarnClusterDescriptor

Repository: flink
Updated Branches:
  refs/heads/master fa16e9a81 -> e97514090


[FLINK-7125] [yarn] Remove Configuration loading from AbstractYarnClusterDescriptor

Instead the AbstractYarnClusterDescriptor is passed in a Configuration instance which
is sent to the started application master.

Pass in configuration directory manually

Remove configurationDirectory resolution from AbstractYarnClusterDescriptor

Address PR comments

This closes #4280.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e9751409
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e9751409
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e9751409

Branch: refs/heads/master
Commit: e9751409033870658a12a8edb4cf11a84fd04e3d
Parents: fa16e9a
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jul 7 13:40:29 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 28 12:16:58 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 17 ++++-
 .../flink/client/cli/CustomCommandLine.java     | 19 +++---
 .../org/apache/flink/client/cli/DefaultCLI.java |  6 +-
 .../org/apache/flink/api/scala/FlinkShell.scala |  6 +-
 ...CliFrontendYarnAddressConfigurationTest.java | 10 ++-
 .../flink/yarn/FlinkYarnSessionCliTest.java     | 48 ++++++++-----
 .../yarn/TestingYarnClusterDescriptor.java      |  4 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java  | 11 ++-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  9 ++-
 .../flink/yarn/YarnClusterDescriptorTest.java   | 15 ++--
 .../org/apache/flink/yarn/YarnTestBase.java     | 35 ++++++++--
 .../yarn/AbstractYarnClusterDescriptor.java     | 72 ++++++++------------
 .../flink/yarn/YarnClusterDescriptor.java       |  5 ++
 .../flink/yarn/YarnClusterDescriptorV2.java     |  5 ++
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 15 ++--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 51 +++++++++-----
 .../flink/yarn/YarnClusterDescriptorTest.java   | 32 ++++-----
 17 files changed, 221 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 4c7f6a8..2e140df 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -152,6 +152,8 @@ public class CliFrontend {
 
 	private final Configuration config;
 
+	private final String configurationDirectory;
+
 	private final FiniteDuration clientTimeout;
 
 	private final int defaultParallelism;
@@ -165,6 +167,7 @@ public class CliFrontend {
 	}
 
 	public CliFrontend(String configDir) throws Exception {
+		configurationDirectory = Preconditions.checkNotNull(configDir);
 
 		// configure the config directory
 		File configDirectory = new File(configDir);
@@ -204,6 +207,15 @@ public class CliFrontend {
 		return copiedConfiguration;
 	}
 
+	/**
+	 * Returns the configuration directory for the CLI frontend.
+	 *
+	 * @return Configuration directory
+	 */
+	public String getConfigurationDirectory() {
+		return configurationDirectory;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Execute Actions
 	// --------------------------------------------------------------------------------------------
@@ -906,7 +918,7 @@ public class CliFrontend {
 	protected ClusterClient retrieveClient(CommandLineOptions options) {
 		CustomCommandLine customCLI = getActiveCustomCommandLine(options.getCommandLine());
 		try {
-			ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config);
+			ClusterClient client = customCLI.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
 			logAndSysout("Using address " + client.getJobManagerAddress() + " to connect to JobManager.");
 			return client;
 		} catch (Exception e) {
@@ -943,7 +955,7 @@ public class CliFrontend {
 
 		ClusterClient client;
 		try {
-			client = activeCommandLine.retrieveCluster(options.getCommandLine(), config);
+			client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
 			logAndSysout("Cluster configuration: " + client.getClusterIdentifier());
 		} catch (UnsupportedOperationException e) {
 			try {
@@ -952,6 +964,7 @@ public class CliFrontend {
 					applicationName,
 					options.getCommandLine(),
 					config,
+					configurationDirectory,
 					program.getAllLibraries());
 				logAndSysout("Cluster started: " + client.getClusterIdentifier());
 			} catch (UnsupportedOperationException e2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index 9ddaf9e..517fc71 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -62,25 +62,28 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 	 * Retrieves a client for a running cluster.
 	 * @param commandLine The command-line parameters from the CliFrontend
 	 * @param config The Flink config
+	 * @param configurationDirectory Directory for configuration files
 	 * @return Client if a cluster could be retrieved
 	 * @throws UnsupportedOperationException if the operation is not supported
 	 */
 	ClusterType retrieveCluster(
-			CommandLine commandLine,
-			Configuration config) throws UnsupportedOperationException;
+		CommandLine commandLine,
+		Configuration config,
+		String configurationDirectory) throws UnsupportedOperationException;
 
 	/**
 	 * Creates the client for the cluster.
 	 * @param applicationName The application name to use
 	 * @param commandLine The command-line options parsed by the CliFrontend
 	 * @param config The Flink config to use
-	 * @param userJarFiles User jar files to include in the classpath of the cluster.
-	 * @return The client to communicate with the cluster which the CustomCommandLine brought up.
+	 * @param configurationDirectory Directory for configuration files
+	 *@param userJarFiles User jar files to include in the classpath of the cluster.  @return The client to communicate with the cluster which the CustomCommandLine brought up.
 	 * @throws Exception if the cluster could not be created
 	 */
 	ClusterType createCluster(
-			String applicationName,
-			CommandLine commandLine,
-			Configuration config,
-			List<URL> userJarFiles) throws Exception;
+		String applicationName,
+		CommandLine commandLine,
+		Configuration config,
+		String configurationDirectory,
+		List<URL> userJarFiles) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index ec55c91..a5d8a30 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -59,7 +59,10 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 	}
 
 	@Override
-	public StandaloneClusterClient retrieveCluster(CommandLine commandLine, Configuration config) {
+	public StandaloneClusterClient retrieveCluster(
+			CommandLine commandLine,
+			Configuration config,
+			String configurationDirectory) {
 
 		if (commandLine.hasOption(CliFrontendParser.ADDRESS_OPTION.getOpt())) {
 			String addressWithPort = commandLine.getOptionValue(CliFrontendParser.ADDRESS_OPTION.getOpt());
@@ -81,6 +84,7 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 			String applicationName,
 			CommandLine commandLine,
 			Configuration config,
+			String configurationDirectory,
 			List<URL> userJarFiles) throws UnsupportedOperationException {
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index a5c5860..60eaccc 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -257,6 +257,7 @@ object FlinkShell {
       "Flink Scala Shell",
       options.getCommandLine,
       config,
+      frontend.getConfigurationDirectory,
       Collections.emptyList())
 
     val address = cluster.getJobManagerAddress.getAddress.getHostAddress
@@ -277,7 +278,10 @@ object FlinkShell {
     val config = frontend.getConfiguration
     val customCLI = frontend.getActiveCustomCommandLine(options.getCommandLine)
 
-    val cluster = customCLI.retrieveCluster(options.getCommandLine, config)
+    val cluster = customCLI.retrieveCluster(
+      options.getCommandLine,
+      config,
+      frontend.getConfigurationDirectory)
 
     if (cluster == null) {
       throw new RuntimeException("Yarn Cluster could not be retrieved.")

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 6811511..0d57e20 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -362,8 +362,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 
 			@Override
 			// override cluster descriptor to replace the YarnClient
-			protected AbstractYarnClusterDescriptor getClusterDescriptor() {
-				return new TestingYarnClusterDescriptor();
+			protected AbstractYarnClusterDescriptor getClusterDescriptor(
+					Configuration configuration,
+					String configurationDirecotry) {
+				return new TestingYarnClusterDescriptor(configuration, configurationDirecotry);
 			}
 
 			/**
@@ -371,6 +373,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 			 */
 			private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
 
+				public TestingYarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
+					super(flinkConfiguration, configurationDirectory);
+				}
+
 				@Override
 				protected YarnClient getYarnClient() {
 					return new TestYarnClient();

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index f6c8bbb..c7c25ff 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
@@ -43,7 +42,6 @@ import org.mockito.Mockito;
 
 import java.io.File;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -57,8 +55,6 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	@Test
 	public void testDynamicProperties() throws Exception {
 
-		Map<String, String> map = new HashMap<String, String>(System.getenv());
-		TestBaseUtils.setEnv(map);
 		FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
 			"",
 			"",
@@ -71,7 +67,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		CommandLine cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15",
 				"-D", "akka.ask.timeout=5 min", "-D", "env.java.opts=-DappName=foobar"});
 
-		AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(null, cmd);
+		AbstractYarnClusterDescriptor flinkYarnDescriptor = cli.createDescriptor(
+			new Configuration(),
+			tmp.getRoot().getAbsolutePath(),
+			null,
+			cmd);
 
 		Assert.assertNotNull(flinkYarnDescriptor);
 
@@ -84,10 +84,7 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Test
 	public void testNotEnoughTaskSlots() throws Exception {
-
-		File confFile = tmp.newFile("flink-conf.yaml");
 		File jarFile = tmp.newFile("test.jar");
-		new CliFrontend(tmp.getRoot().getAbsolutePath());
 
 		String[] params =
 			new String[] {"-yn", "2", "-ys", "3", "-p", "7", jarFile.getAbsolutePath()};
@@ -108,7 +105,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		File confFile = tmp.newFile("flink-conf.yaml");
 		File jarFile = tmp.newFile("test.jar");
-		new CliFrontend(tmp.getRoot().getAbsolutePath());
+		CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
+
+		final Configuration config = cliFrontend.getConfiguration();
 
 		String[] params =
 			new String[] {"-yn", "2", "-ys", "3", jarFile.getAbsolutePath()};
@@ -117,14 +116,22 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
 
-		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
-		final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
+		final Configuration configuration = new Configuration();
+
+		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
+			configuration,
+			tmp.getRoot().getAbsolutePath(),
+			"",
+			runOptions.getCommandLine());
+
+		final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(
+			configuration,
+			runOptions.getCommandLine());
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
 		Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
 
-		Configuration config = new Configuration();
 		CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
 		ClusterClient client = new TestingYarnClusterClient(
 			descriptor,
@@ -139,7 +146,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		File confFile = tmp.newFile("flink-conf.yaml");
 		File jarFile = tmp.newFile("test.jar");
-		new CliFrontend(tmp.getRoot().getAbsolutePath());
+		CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
+		final Configuration configuration = cliFrontend.getConfiguration();
 
 		String zkNamespaceCliInput = "flink_test_namespace";
 
@@ -149,7 +157,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
 
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
-		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
+		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
+			configuration,
+			tmp.getRoot().getAbsolutePath(),
+			"",
+			runOptions.getCommandLine());
 
 		Assert.assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
 	}
@@ -161,6 +173,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		}
 
 		private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
+			public JarAgnosticClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
+				super(flinkConfiguration, configurationDirectory);
+			}
+
 			@Override
 			public void setLocalJarPath(Path localJarPath) {
 				// add nothing
@@ -168,8 +184,8 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		}
 
 		@Override
-		protected AbstractYarnClusterDescriptor getClusterDescriptor() {
-			return new JarAgnosticClusterDescriptor();
+		protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) {
+			return new JarAgnosticClusterDescriptor(configuration, configurationDirectory);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index cde13ff..b2fe133 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -33,7 +34,8 @@ import java.util.List;
  */
 public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
-	public TestingYarnClusterDescriptor() {
+	public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
+		super(configuration, configurationDirectory);
 		List<File> filesToShip = new ArrayList<>();
 
 		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 308cdc1..897dc8c 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -104,29 +105,25 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 	@Test
 	public void testMultipleAMKill() throws Exception {
 		final int numberKillingAttempts = numberApplicationAttempts - 1;
-
-		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor();
+		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+		final Configuration configuration = GlobalConfiguration.loadConfiguration();
+		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(configuration, confDirPath);
 
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
-		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-
 		String fsStateHandlePath = temp.getRoot().getPath();
 
 		// load the configuration
 		File configDirectory = new File(confDirPath);
 		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 
-		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
 			"@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
 			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
-		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		ClusterClient yarnCluster = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 2bbaadf..dd56f2f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
@@ -224,14 +225,12 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		final int waitTime = 15;
 		LOG.info("Starting testJavaAPI()");
 
-		AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
+		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+		Configuration configuration = GlobalConfiguration.loadConfiguration();
+		AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(configuration, confDirPath);
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-		flinkYarnClient.setConfigurationDirectory(confDirPath);
-		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
-		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 			.setMasterMemoryMB(768)

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index c40a300..f3e48c5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
@@ -39,7 +40,7 @@ import java.util.Set;
 /**
  * Tests for the YarnClusterDescriptor.
  */
-public class YarnClusterDescriptorTest {
+public class YarnClusterDescriptorTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -49,13 +50,9 @@ public class YarnClusterDescriptorTest {
 	 */
 	@Test
 	public void testExplicitLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
+		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
 		descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
 
-		descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
-		descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
-		descriptor.setFlinkConfiguration(new Configuration());
-
 		File libFile = temporaryFolder.newFile("libFile.jar");
 		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
 
@@ -86,11 +83,7 @@ public class YarnClusterDescriptorTest {
 	 */
 	@Test
 	public void testEnvironmentLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
-
-		descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
-		descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
-		descriptor.setFlinkConfiguration(new Configuration());
+		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
 
 		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
 		File libFile = new File(libFolder, "libFile.jar");

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index e701104..62ff955 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
@@ -136,8 +137,6 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	protected static File tempConfPathForSecureRun = null;
 
-	protected static org.apache.flink.configuration.Configuration flinkConfiguration = new org.apache.flink.configuration.Configuration();
-
 	static {
 		YARN_CONFIGURATION = new YarnConfiguration();
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -185,6 +184,8 @@ public abstract class YarnTestBase extends TestLogger {
 	}
 
 	private YarnClient yarnClient = null;
+	protected org.apache.flink.configuration.Configuration flinkConfiguration;
+
 	@Before
 	public void checkClusterEmpty() throws IOException, YarnException {
 		if (yarnClient == null) {
@@ -202,6 +203,8 @@ public abstract class YarnTestBase extends TestLogger {
 						"App " + app.getApplicationId() + " is in state " + app.getYarnApplicationState());
 			}
 		}
+
+		flinkConfiguration = new org.apache.flink.configuration.Configuration();
 	}
 
 	/**
@@ -498,7 +501,12 @@ public abstract class YarnTestBase extends TestLogger {
 
 		final int startTimeoutSeconds = 60;
 
-		Runner runner = new Runner(args, type, 0);
+		Runner runner = new Runner(
+			args,
+			flinkConfiguration,
+			CliFrontend.getConfigurationDirectoryFromEnv(),
+			type,
+			0);
 		runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
 		runner.start();
 
@@ -551,7 +559,12 @@ public abstract class YarnTestBase extends TestLogger {
 		final int startTimeoutSeconds = 180;
 		final long deadline = System.currentTimeMillis() + (startTimeoutSeconds * 1000);
 
-		Runner runner = new Runner(args, type, expectedReturnValue);
+		Runner runner = new Runner(
+			args,
+			flinkConfiguration,
+			CliFrontend.getConfigurationDirectoryFromEnv(),
+			type,
+			expectedReturnValue);
 		runner.start();
 
 		boolean expectedStringSeen = false;
@@ -632,14 +645,24 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	protected static class Runner extends Thread {
 		private final String[] args;
+		private final org.apache.flink.configuration.Configuration configuration;
+		private final String configurationDirectory;
 		private final int expectedReturnValue;
 
 		private RunTypes type;
 		private FlinkYarnSessionCli yCli;
 		private Throwable runnerError;
 
-		public Runner(String[] args, RunTypes type, int expectedReturnValue) {
+		public Runner(
+				String[] args,
+				org.apache.flink.configuration.Configuration configuration,
+				String configurationDirectory,
+				RunTypes type,
+				int expectedReturnValue) {
+
 			this.args = args;
+			this.configuration = Preconditions.checkNotNull(configuration);
+			this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 			this.type = type;
 			this.expectedReturnValue = expectedReturnValue;
 		}
@@ -654,7 +677,7 @@ public abstract class YarnTestBase extends TestLogger {
 							"",
 							"",
 							false);
-						returnValue = yCli.run(args);
+						returnValue = yCli.run(args, configuration, configurationDirectory);
 						break;
 					case CLI_FRONTEND:
 						TestingCLI cli;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 51b7600..908e91e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -119,8 +117,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private String configurationDirectory;
 
-	private Path flinkConfigurationPath;
-
 	private Path flinkJarPath;
 
 	private String dynamicPropertiesEncoded;
@@ -128,7 +124,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	/** Lazily initialized list of files to ship. */
 	protected List<File> shipFiles = new LinkedList<>();
 
-	private org.apache.flink.configuration.Configuration flinkConfiguration;
+	private final org.apache.flink.configuration.Configuration flinkConfiguration;
 
 	private boolean detached;
 
@@ -142,7 +138,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
-	public AbstractYarnClusterDescriptor() {
+	public AbstractYarnClusterDescriptor(
+		org.apache.flink.configuration.Configuration flinkConfiguration,
+		String configurationDirectory) {
 		// for unit tests only
 		if (System.getenv("IN_TESTS") != null) {
 			try {
@@ -152,21 +150,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		// tries to load the config through the environment, if it fails it can still be set through the setters
-		try {
-			this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
-			this.flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);
-
-			File confFile = new File(configurationDirectory + File.separator + GlobalConfiguration.FLINK_CONF_FILENAME);
-			if (!confFile.exists()) {
-				throw new RuntimeException("Unable to locate configuration file in " + confFile);
-			}
-			flinkConfigurationPath = new Path(confFile.getAbsolutePath());
+		this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
+		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
-			userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
-		} catch (Exception e) {
-			LOG.debug("Config couldn't be loaded from environment variable.", e);
-		}
+		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 	}
 
 	/**
@@ -174,12 +161,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 */
 	protected abstract Class<?> getApplicationMasterClass();
 
-	public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
-		this.flinkConfiguration = conf;
-
-		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
-	}
-
 	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
 		return flinkConfiguration;
 	}
@@ -195,14 +176,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.flinkJarPath = localJarPath;
 	}
 
-	public void setConfigurationFilePath(Path confPath) {
-		flinkConfigurationPath = confPath;
-	}
-
-	public void setConfigurationDirectory(String configurationDirectory) {
-		this.configurationDirectory = configurationDirectory;
-	}
-
 	public void addShipFiles(List<File> shipFiles) {
 		for (File shipFile: shipFiles) {
 			// remove uberjar from ship list (by default everything in the lib/ folder is added to
@@ -268,9 +241,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		if (this.configurationDirectory == null) {
 			throw new YarnDeploymentException("Configuration directory not set");
 		}
-		if (this.flinkConfigurationPath == null) {
-			throw new YarnDeploymentException("Configuration path not set");
-		}
 		if (this.flinkConfiguration == null) {
 			throw new YarnDeploymentException("Flink configuration object has not been set");
 		}
@@ -734,12 +704,30 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		Path remotePathJar =
-			Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
-		Path remotePathConf =
-			Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
+		Path remotePathJar = Utils.setupLocalResource(
+			fs,
+			appId.toString(),
+			flinkJarPath,
+			appMasterJar,
+			fs.getHomeDirectory());
+
 		localResources.put("flink.jar", appMasterJar);
+
+		// Upload the flink configuration
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+		// write out configuration file
+		File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
+		tmpConfigurationFile.deleteOnExit();
+		BootstrapTools.writeConfiguration(flinkConfiguration, tmpConfigurationFile);
+
+		Path remotePathConf = Utils.setupLocalResource(
+			fs,
+			appId.toString(),
+			new Path(tmpConfigurationFile.getAbsolutePath()),
+			flinkConf,
+			fs.getHomeDirectory());
+
 		localResources.put("flink-conf.yaml", flinkConf);
 
 		paths.add(remotePathJar);

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index dae0f64..b518f9e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 /**
@@ -25,6 +26,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
+	public YarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
+		super(flinkConfiguration, configurationDirectory);
+	}
+
 	@Override
 	protected Class<?> getApplicationMasterClass() {
 		return YarnApplicationMasterRunner.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index 9807273..6a43374 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 /**
@@ -28,6 +29,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
  */
 public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
 
+	public YarnClusterDescriptorV2(Configuration flinkConfiguration, String configurationDirectory) {
+		super(flinkConfiguration, configurationDirectory);
+	}
+
 	@Override
 	protected Class<?> getApplicationMasterClass() {
 		return YarnFlinkApplicationMasterRunner.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index af49fa8..62ba207 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn.cli;
 
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.Configuration;
@@ -96,9 +97,12 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 		allOptions.addOption(zookeeperNamespace);
 	}
 
-	public YarnClusterDescriptorV2 createDescriptor(String defaultApplicationName, CommandLine cmd) {
+	public YarnClusterDescriptorV2 createDescriptor(
+			Configuration configuration,
+			String defaultApplicationName,
+			CommandLine cmd) {
 
-		YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2();
+		YarnClusterDescriptorV2 yarnClusterDescriptor = new YarnClusterDescriptorV2(configuration, CliFrontend.getConfigurationDirectoryFromEnv());
 
 		// Jar Path
 		Path localJarPath;
@@ -209,7 +213,8 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 	@Override
 	public YarnClusterClientV2 retrieveCluster(
 			CommandLine cmdLine,
-			Configuration config) throws UnsupportedOperationException {
+			Configuration config,
+			String configurationDirectory) throws UnsupportedOperationException {
 
 		throw new UnsupportedOperationException("Not support retrieveCluster since Flip-6.");
 	}
@@ -219,11 +224,11 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 			String applicationName,
 			CommandLine cmdLine,
 			Configuration config,
+			String configurationDirectory,
 			List<URL> userJarFiles) throws Exception {
 		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
 
-		YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
-		yarnClusterDescriptor.setFlinkConfiguration(config);
+		YarnClusterDescriptorV2 yarnClusterDescriptor = createDescriptor(config, applicationName, cmdLine);
 		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
 		return new YarnClusterClientV2(

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index a887e54..4a214e2 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn.cli;
 
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.deployment.ClusterSpecification;
@@ -257,9 +258,15 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		return applicationID;
 	}
 
-	public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) {
+	public AbstractYarnClusterDescriptor createDescriptor(
+		Configuration configuration,
+		String configurationDirectory,
+		String defaultApplicationName,
+		CommandLine cmd) {
 
-		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
+		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor(
+			configuration,
+			configurationDirectory);
 
 		// Jar Path
 		Path localJarPath;
@@ -488,13 +495,16 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	public static void main(final String[] args) throws Exception {
-		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
 		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
+
+		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
+
+		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
 		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfiguration));
 		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 			@Override
 			public Integer call() {
-				return cli.run(args);
+				return cli.run(args, flinkConfiguration, configurationDirectory);
 			}
 		});
 		System.exit(retCode);
@@ -528,7 +538,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	@Override
 	public YarnClusterClient retrieveCluster(
 			CommandLine cmdLine,
-			Configuration config) throws UnsupportedOperationException {
+			Configuration config,
+			String configurationDirectory) throws UnsupportedOperationException {
 
 		// first check for an application id, then try to load from yarn properties
 		String applicationID = cmdLine.hasOption(applicationId.getOpt()) ?
@@ -541,8 +552,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 					: config.getString(HighAvailabilityOptions.HA_CLUSTER_ID, applicationID);
 			config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
 
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
-			yarnDescriptor.setFlinkConfiguration(config);
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(config, configurationDirectory);
 			return yarnDescriptor.retrieve(applicationID);
 		} else {
 			throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
@@ -554,12 +564,18 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			String applicationName,
 			CommandLine cmdLine,
 			Configuration config,
+			String configurationDirectory,
 			List<URL> userJarFiles) {
 		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
 
-		AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
-		ClusterSpecification clusterSpecification = createClusterSpecification(config, cmdLine);
-		yarnClusterDescriptor.setFlinkConfiguration(config);
+		AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(
+			config,
+			configurationDirectory,
+			applicationName,
+			cmdLine);
+
+		final ClusterSpecification clusterSpecification = createClusterSpecification(config, cmdLine);
+
 		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
 		try {
@@ -570,7 +586,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	}
 
-	public int run(String[] args) {
+	public int run(
+			String[] args,
+			Configuration configuration,
+			String configurationDirectory) {
 		//
 		//	Command Line Options
 		//
@@ -590,7 +609,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		// Query cluster for metrics
 		if (cmd.hasOption(query.getOpt())) {
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory);
 			String description;
 			try {
 				description = yarnDescriptor.getClusterDescription();
@@ -603,7 +622,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			return 0;
 		} else if (cmd.hasOption(applicationId.getOpt())) {
 
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(configuration, configurationDirectory);
 
 			//configure ZK namespace depending on the value passed
 			String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
@@ -631,7 +650,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 			AbstractYarnClusterDescriptor yarnDescriptor;
 			try {
-				yarnDescriptor = createDescriptor(null, cmd);
+				yarnDescriptor = createDescriptor(configuration, configurationDirectory, null, cmd);
 			} catch (Exception e) {
 				System.err.println("Error while starting the YARN Client: " + e.getMessage());
 				e.printStackTrace(System.err);
@@ -745,7 +764,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
 	}
 
-	protected AbstractYarnClusterDescriptor getClusterDescriptor() {
-		return new YarnClusterDescriptor();
+	protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory) {
+		return new YarnClusterDescriptor(configuration, configurationDirectory);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e9751409/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index a862b50..86122d6 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -48,24 +48,21 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	private File flinkJar;
-	private File flinkConf;
 
 	@Before
 	public void beforeTest() throws IOException {
 		temporaryFolder.create();
 		flinkJar = temporaryFolder.newFile("flink.jar");
-		flinkConf = temporaryFolder.newFile("flink-conf.yaml");
 	}
 
 	@Test
 	public void testFailIfTaskSlotsHigherThanMaxVcores() {
 
-		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
+		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			temporaryFolder.getRoot().getAbsolutePath());
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
-		clusterDescriptor.setFlinkConfiguration(new Configuration());
-		clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
-		clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
 
 		ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 			.setMasterMemoryMB(-1)
@@ -88,17 +85,15 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 	@Test
 	public void testConfigOverwrite() {
-
-		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
-
 		Configuration configuration = new Configuration();
 		// overwrite vcores in config
 		configuration.setInteger(ConfigConstants.YARN_VCORES, Integer.MAX_VALUE);
 
+		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+			configuration,
+			temporaryFolder.getRoot().getAbsolutePath());
+
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
-		clusterDescriptor.setFlinkConfiguration(configuration);
-		clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
-		clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
 
 		// configure slots
 		ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
@@ -122,9 +117,10 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 	@Test
 	public void testSetupApplicationMasterContainer() {
-		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor();
-		final Configuration cfg = new Configuration();
-		clusterDescriptor.setFlinkConfiguration(cfg);
+		Configuration cfg = new Configuration();
+		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+			cfg,
+			temporaryFolder.getRoot().getAbsolutePath());
 
 		final String java = "$JAVA_HOME/bin/java";
 		final String jvmmem = "-Xmx424m";
@@ -254,6 +250,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5, different JVM opts
+		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
+		// because we have a reference to the ClusterDescriptor's configuration which we modify continuously
 		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
 		assertEquals(
 			java + " " + jvmmem +
@@ -282,6 +280,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5, different JVM opts
+		// IMPORTANT: Beaware that we are using side effects here to modify the created YarnClusterDescriptor
 		cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
 		assertEquals(
 			java + " " + jvmmem +
@@ -310,7 +309,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 				.getCommands().get(0));
 
 		// now try some configurations with different yarn.container-start-command-template
-
+		// IMPORTANT: Beaware that we are using side effects here to modify the created YarnClusterDescriptor
 		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
 			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
 		assertEquals(
@@ -328,6 +327,7 @@ public class YarnClusterDescriptorTest extends TestLogger {
 
 		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
 			"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+		// IMPORTANT: Beaware that we are using side effects here to modify the created YarnClusterDescriptor
 		assertEquals(
 			java +
 				" " + logfile + " " + logback + " " + log4j +