You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/26 23:24:03 UTC

flink git commit: [FLINK-7113] Make ClusterDescriptor independent of cluster size

Repository: flink
Updated Branches:
  refs/heads/master 5e19a0da6 -> 7cf997d16


[FLINK-7113] Make ClusterDescriptor independent of cluster size

The deploySession method now is given a ClusterSpecification which specifies the
size of the cluster which it is supposed to deploy.

Remove 2 line breaks, unnecessary parameters for YarnTestBase#Runner, add builder for ClusterSpecification

This closes #4271.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cf997d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cf997d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cf997d1

Branch: refs/heads/master
Commit: 7cf997d16bbf8c7a6bbb3522aeba0d9e7c9af6b6
Parents: 5e19a0d
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 14:00:21 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 26 18:55:10 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  56 ++--
 .../org/apache/flink/client/cli/DefaultCLI.java |   5 +-
 .../client/deployment/ClusterDescriptor.java    |   3 +-
 .../client/deployment/ClusterSpecification.java | 119 +++++++
 .../deployment/StandaloneClusterDescriptor.java |   2 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   5 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  39 ++-
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  13 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  13 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   8 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 318 ++++++++++---------
 .../apache/flink/yarn/YarnClusterClient.java    |  19 +-
 .../apache/flink/yarn/YarnClusterClientV2.java  |  25 +-
 .../org/apache/flink/yarn/cli/FlinkYarnCLI.java |  10 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  94 +++---
 .../flink/yarn/YarnClusterDescriptorTest.java   | 108 +++++--
 16 files changed, 540 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index f89b016..4c7f6a8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -90,7 +90,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -126,16 +125,27 @@ public class CliFrontend {
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final List<CustomCommandLine> customCommandLine = new LinkedList<>();
+	private static final List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(3);
 
 	static {
 		//	Command line interface of the YARN session, with a special initialization here
 		//	to prefix all options with y/yarn.
 		//	Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
 		//	      active CustomCommandLine in order and DefaultCLI isActive always return true.
-		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
-		loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn");
-		customCommandLine.add(new DefaultCLI());
+		final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
+		final String flinkYarnCLI = "org.apache.flink.yarn.cli.FlinkYarnCLI";
+		try {
+			customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, "y", "yarn"));
+		} catch (Exception e) {
+			LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
+		}
+
+		try {
+			customCommandLines.add(loadCustomCommandLine(flinkYarnCLI, "y", "yarn"));
+		} catch (Exception e) {
+			LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e);
+		}
+		customCommandLines.add(new DefaultCLI());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1172,7 +1182,7 @@ public class CliFrontend {
 	 * @return custom command-line which is active (may only be one at a time)
 	 */
 	public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
-		for (CustomCommandLine cli : customCommandLine) {
+		for (CustomCommandLine cli : customCommandLines) {
 			if (cli.isActive(commandLine, config)) {
 				return cli;
 			}
@@ -1184,8 +1194,8 @@ public class CliFrontend {
 	 * Retrieves the loaded custom command-lines.
 	 * @return An unmodifiyable list of loaded custom command-lines.
 	 */
-	public static List<CustomCommandLine> getCustomCommandLineList() {
-		return Collections.unmodifiableList(customCommandLine);
+	public static List<CustomCommandLine<?>> getCustomCommandLineList() {
+		return Collections.unmodifiableList(customCommandLines);
 	}
 
 	/**
@@ -1193,29 +1203,21 @@ public class CliFrontend {
 	 * @param className The fully-qualified class name to load.
 	 * @param params The constructor parameters
 	 */
-	private static void loadCustomCommandLine(String className, Object... params) {
+	private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
 
-		try {
-			Class<? extends CustomCommandLine> customCliClass =
-				Class.forName(className).asSubclass(CustomCommandLine.class);
-
-			// construct class types from the parameters
-			Class<?>[] types = new Class<?>[params.length];
-			for (int i = 0; i < params.length; i++) {
-				Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
-				types[i] = params[i].getClass();
-			}
+		Class<? extends CustomCommandLine> customCliClass =
+			Class.forName(className).asSubclass(CustomCommandLine.class);
 
-			Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
-			final CustomCommandLine cli = constructor.newInstance(params);
+		// construct class types from the parameters
+		Class<?>[] types = new Class<?>[params.length];
+		for (int i = 0; i < params.length; i++) {
+			Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+			types[i] = params[i].getClass();
+		}
 
-			customCommandLine.add(cli);
+		Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
 
-		} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
-			| InvocationTargetException e) {
-			LOG.warn("Unable to locate custom CLI class {}. " +
-				"Flink is not compiled with support for this class.", className, e);
-		}
+		return constructor.newInstance(params);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 042a44a..ec55c91 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.Configuration;
@@ -83,6 +84,8 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
 			List<URL> userJarFiles) throws UnsupportedOperationException {
 
 		StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
-		return descriptor.deploySessionCluster();
+		ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config);
+
+		return descriptor.deploySessionCluster(clusterSpecification);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index e2ffafa..ba83607 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -42,10 +42,11 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
 
 	/**
 	 * Triggers deployment of a cluster.
+	 * @param clusterSpecification Cluster specification defining the cluster to deploy
 	 * @return Client for the cluster
 	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
 	 */
-	ClientType deploySessionCluster() throws UnsupportedOperationException;
+	ClientType deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
 
 	/**
 	 * Deploys a per-job cluster with the given job on the cluster.

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
new file mode 100644
index 0000000..8650cab
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+/**
+ * Description of the cluster to start by the {@link ClusterDescriptor}.
+ */
+public final class ClusterSpecification {
+	private final int masterMemoryMB;
+	private final int taskManagerMemoryMB;
+	private final int numberTaskManagers;
+	private final int slotsPerTaskManager;
+
+	private ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
+		this.masterMemoryMB = masterMemoryMB;
+		this.taskManagerMemoryMB = taskManagerMemoryMB;
+		this.numberTaskManagers = numberTaskManagers;
+		this.slotsPerTaskManager = slotsPerTaskManager;
+	}
+
+	public int getMasterMemoryMB() {
+		return masterMemoryMB;
+	}
+
+	public int getTaskManagerMemoryMB() {
+		return taskManagerMemoryMB;
+	}
+
+	public int getNumberTaskManagers() {
+		return numberTaskManagers;
+	}
+
+	public int getSlotsPerTaskManager() {
+		return slotsPerTaskManager;
+	}
+
+	@Override
+	public String toString() {
+		return "ClusterSpecification{" +
+			"masterMemoryMB=" + masterMemoryMB +
+			", taskManagerMemoryMB=" + taskManagerMemoryMB +
+			", numberTaskManagers=" + numberTaskManagers +
+			", slotsPerTaskManager=" + slotsPerTaskManager +
+			'}';
+	}
+
+	public static ClusterSpecification fromConfiguration(Configuration configuration) {
+		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+
+		return new ClusterSpecificationBuilder()
+			.setMasterMemoryMB(jobManagerMemoryMb)
+			.setTaskManagerMemoryMB(taskManagerMemoryMb)
+			.setNumberTaskManagers(1)
+			.setSlotsPerTaskManager(slots)
+			.createClusterSpecification();
+	}
+
+	/**
+	 * Builder for the {@link ClusterSpecification} instance.
+	 */
+	public static class ClusterSpecificationBuilder {
+		private int masterMemoryMB = 768;
+		private int taskManagerMemoryMB = 768;
+		private int numberTaskManagers = 1;
+		private int slotsPerTaskManager = 1;
+
+		public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) {
+			this.masterMemoryMB = masterMemoryMB;
+			return this;
+		}
+
+		public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) {
+			this.taskManagerMemoryMB = taskManagerMemoryMB;
+			return this;
+		}
+
+		public ClusterSpecificationBuilder setNumberTaskManagers(int numberTaskManagers) {
+			this.numberTaskManagers = numberTaskManagers;
+			return this;
+		}
+
+		public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) {
+			this.slotsPerTaskManager = slotsPerTaskManager;
+			return this;
+		}
+
+		public ClusterSpecification createClusterSpecification() {
+			return new ClusterSpecification(
+				masterMemoryMB,
+				taskManagerMemoryMB,
+				numberTaskManagers,
+				slotsPerTaskManager);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index ebdd0a8..435692f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -51,7 +51,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	}
 
 	@Override
-	public StandaloneClusterClient deploySessionCluster() throws UnsupportedOperationException {
+	public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
 		throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index d59e5b4..6811511 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.commons.cli.CommandLine;
@@ -66,7 +67,7 @@ import static org.junit.Assert.assertEquals;
  * Tests that verify that the CLI client picks up the correct address for the JobManager
  * from configuration and configs.
  */
-public class CliFrontendYarnAddressConfigurationTest {
+public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -378,6 +379,8 @@ public class CliFrontendYarnAddressConfigurationTest {
 				@Override
 				protected YarnClusterClient createYarnClusterClient(
 						AbstractYarnClusterDescriptor descriptor,
+						int numberTaskManagers,
+						int slotsPerTaskManager,
 						YarnClient yarnClient,
 						ApplicationReport report,
 						Configuration flinkConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a2497a1..f6c8bbb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -21,10 +21,11 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.commons.cli.CommandLine;
@@ -48,7 +49,7 @@ import java.util.Map;
 /**
  * Tests for the FlinkYarnSessionCli.
  */
-public class FlinkYarnSessionCliTest {
+public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder tmp = new TemporaryFolder();
@@ -57,12 +58,11 @@ public class FlinkYarnSessionCliTest {
 	public void testDynamicProperties() throws Exception {
 
 		Map<String, String> map = new HashMap<String, String>(System.getenv());
-		File tmpFolder = tmp.newFolder();
-		File fakeConf = new File(tmpFolder, "flink-conf.yaml");
-		fakeConf.createNewFile();
-		map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpFolder.getAbsolutePath());
 		TestBaseUtils.setEnv(map);
-		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
+		FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+			"",
+			"",
+			false);
 		Options options = new Options();
 		cli.addGeneralOptions(options);
 		cli.addRunOptions(options);
@@ -96,11 +96,11 @@ public class FlinkYarnSessionCliTest {
 
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
 
-		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
+		ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
-		Assert.assertEquals(4, descriptor.getTaskManagerSlots());
-		Assert.assertEquals(2, descriptor.getTaskManagerCount());
+		Assert.assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
+		Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
 	}
 
 	@Test
@@ -118,14 +118,19 @@ public class FlinkYarnSessionCliTest {
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
 
 		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
+		final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
-		Assert.assertEquals(3, descriptor.getTaskManagerSlots());
-		Assert.assertEquals(2, descriptor.getTaskManagerCount());
+		Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
+		Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
 
 		Configuration config = new Configuration();
 		CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
-		ClusterClient client = new TestingYarnClusterClient(descriptor, config);
+		ClusterClient client = new TestingYarnClusterClient(
+			descriptor,
+			clusterSpecification.getNumberTaskManagers(),
+			clusterSpecification.getSlotsPerTaskManager(),
+			config);
 		Assert.assertEquals(6, client.getMaxSlots());
 	}
 
@@ -170,8 +175,14 @@ public class FlinkYarnSessionCliTest {
 
 	private static class TestingYarnClusterClient extends YarnClusterClient {
 
-		public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws Exception {
+		public TestingYarnClusterClient(
+				AbstractYarnClusterDescriptor descriptor,
+				int numberTaskManagers,
+				int slotsPerTaskManager,
+				Configuration config) throws Exception {
 			super(descriptor,
+				numberTaskManagers,
+				slotsPerTaskManager,
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index df8133d..308cdc1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
@@ -107,9 +108,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor();
 
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(768);
-		flinkYarnClient.setTaskManagerMemory(1024);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
@@ -136,8 +134,15 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 		HighAvailabilityServices highAvailabilityServices = null;
 
+		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(768)
+			.setTaskManagerMemoryMB(1024)
+			.setNumberTaskManagers(1)
+			.setSlotsPerTaskManager(1)
+			.createClusterSpecification();
+
 		try {
-			yarnCluster = flinkYarnClient.deploySessionCluster();
+			yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
 
 			final ClusterClient finalYarnCluster = yarnCluster;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 8e1e877..2bbaadf 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -225,9 +226,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setTaskManagerCount(1);
-		flinkYarnClient.setJobManagerMemory(768);
-		flinkYarnClient.setTaskManagerMemory(1024);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
@@ -235,10 +233,17 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
+		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(768)
+			.setTaskManagerMemoryMB(1024)
+			.setNumberTaskManagers(1)
+			.setSlotsPerTaskManager(1)
+			.createClusterSpecification();
+
 		// deploy
 		ClusterClient yarnCluster = null;
 		try {
-			yarnCluster = flinkYarnClient.deploySessionCluster();
+			yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
 		} catch (Exception e) {
 			LOG.warn("Failing test", e);
 			Assert.fail("Error while deploying YARN cluster: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 68a1509..e701104 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -136,6 +136,8 @@ public abstract class YarnTestBase extends TestLogger {
 	 */
 	protected static File tempConfPathForSecureRun = null;
 
+	protected static org.apache.flink.configuration.Configuration flinkConfiguration = new org.apache.flink.configuration.Configuration();
+
 	static {
 		YARN_CONFIGURATION = new YarnConfiguration();
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -631,6 +633,7 @@ public abstract class YarnTestBase extends TestLogger {
 	protected static class Runner extends Thread {
 		private final String[] args;
 		private final int expectedReturnValue;
+
 		private RunTypes type;
 		private FlinkYarnSessionCli yCli;
 		private Throwable runnerError;
@@ -647,7 +650,10 @@ public abstract class YarnTestBase extends TestLogger {
 				int returnValue;
 				switch (type) {
 					case YARN_SESSION:
-						yCli = new FlinkYarnSessionCli("", "", false);
+						yCli = new FlinkYarnSessionCli(
+							"",
+							"",
+							false);
 						returnValue = yCli.run(args);
 						break;
 					case CLI_FRONTEND:

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c5277a1..51b7600 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -27,7 +28,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -115,17 +115,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 */
 	private Path sessionFilesDir;
 
-	/**
-	 * If the user has specified a different number of slots, we store them here.
-	 */
-	private int slots = -1;
-
-	private int jobManagerMemoryMb = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue();
-
-	private int taskManagerMemoryMb = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue();
-
-	private int taskManagerCount = 1;
-
 	private String yarnQueue;
 
 	private String configurationDirectory;
@@ -174,11 +163,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 			flinkConfigurationPath = new Path(confFile.getAbsolutePath());
 
-			slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-
-			jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-			taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
-
 			userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 		} catch (Exception e) {
 			LOG.debug("Config couldn't be loaded from environment variable.", e);
@@ -190,22 +174,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 */
 	protected abstract Class<?> getApplicationMasterClass();
 
-	public void setJobManagerMemory(int memoryMb) {
-		if (memoryMb < MIN_JM_MEMORY) {
-			throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
-				+ "of " + MIN_JM_MEMORY + " MB");
-		}
-		this.jobManagerMemoryMb = memoryMb;
-	}
-
-	public void setTaskManagerMemory(int memoryMb) {
-		if (memoryMb < MIN_TM_MEMORY) {
-			throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
-				+ "of " + MIN_TM_MEMORY + " MB");
-		}
-		this.taskManagerMemoryMb = memoryMb;
-	}
-
 	public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
 		this.flinkConfiguration = conf;
 
@@ -216,17 +184,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return flinkConfiguration;
 	}
 
-	public void setTaskManagerSlots(int slots) {
-		if (slots <= 0) {
-			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
-		}
-		this.slots = slots;
-	}
-
-	public int getTaskManagerSlots() {
-		return this.slots;
-	}
-
 	public void setQueue(String queue) {
 		this.yarnQueue = queue;
 	}
@@ -246,17 +203,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.configurationDirectory = configurationDirectory;
 	}
 
-	public void setTaskManagerCount(int tmCount) {
-		if (tmCount < 1) {
-			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
-		}
-		this.taskManagerCount = tmCount;
-	}
-
-	public int getTaskManagerCount() {
-		return this.taskManagerCount;
-	}
-
 	public void addShipFiles(List<File> shipFiles) {
 		for (File shipFile: shipFiles) {
 			// remove uberjar from ship list (by default everything in the lib/ folder is added to
@@ -311,8 +257,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return this.dynamicPropertiesEncoded;
 	}
 
-	private void isReadyForDeployment() throws YarnDeploymentException {
-		if (taskManagerCount <= 0) {
+	private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
+
+		if (clusterSpecification.getNumberTaskManagers() <= 0) {
 			throw new YarnDeploymentException("Taskmanager count must be positive");
 		}
 		if (this.flinkJarPath == null) {
@@ -332,7 +279,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// The number of cores can be configured in the config.
 		// If not configured, it is set to the number of task slots
 		int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
-		int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, slots);
+		int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager());
 		// don't configure more than the maximum configured number of vcores
 		if (configuredVcores > numYarnVcores) {
 			throw new IllegalConfigurationException(
@@ -419,7 +366,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
 			flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
 
-			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
+			return createYarnClusterClient(
+				this,
+				-1, // we don't know the number of task managers of a started Flink cluster
+				-1, // we don't know how many slots each task manager has for a started Flink cluster
+				yarnClient,
+				appReport,
+				flinkConfiguration,
+				false);
 		} catch (Exception e) {
 			if (null != yarnClient) {
 				yarnClient.stop();
@@ -429,7 +383,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public YarnClusterClient deploySessionCluster() {
+	public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) {
 		try {
 			if (UserGroupInformation.isSecurityEnabled()) {
 				// note: UGI::hasKerberosCredentials inaccurately reports false
@@ -445,66 +399,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 							"does not have Kerberos credentials");
 				}
 			}
-			return deployInternal();
+			return deployInternal(clusterSpecification);
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't deploy Yarn cluster", e);
 		}
 	}
 
-	/**
-	 * This method will block until the ApplicationMaster/JobManager have been
-	 * deployed on YARN.
-	 */
-	protected YarnClusterClient deployInternal() throws Exception {
-		isReadyForDeployment();
-		LOG.info("Using values:");
-		LOG.info("\tTaskManager count = {}", taskManagerCount);
-		LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
-		LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
+	protected ClusterSpecification validateClusterResources(
+			ClusterSpecification clusterSpecification,
+			int yarnMinAllocationMB,
+			Resource maximumResourceCapability,
+			ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
 
-		final YarnClient yarnClient = getYarnClient();
-
-		// ------------------ Check if the specified queue exists --------------------
+		int taskManagerCount = clusterSpecification.getNumberTaskManagers();
+		int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
+		int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
 
-		try {
-			List<QueueInfo> queues = yarnClient.getAllQueues();
-			if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
-				boolean queueFound = false;
-				for (QueueInfo queue : queues) {
-					if (queue.getQueueName().equals(this.yarnQueue)) {
-						queueFound = true;
-						break;
-					}
-				}
-				if (!queueFound) {
-					String queueNames = "";
-					for (QueueInfo queue : queues) {
-						queueNames += queue.getQueueName() + ", ";
-					}
-					LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
-						"Available queues: " + queueNames);
-				}
-			} else {
-				LOG.debug("The YARN cluster does not have any queues configured");
-			}
-		} catch (Throwable e) {
-			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Error details", e);
-			}
+		if (jobManagerMemoryMb < MIN_JM_MEMORY) {
+			LOG.warn("The minimum JobManager memory is {}. Will set the JobManager memory to this value.", MIN_JM_MEMORY);
+			jobManagerMemoryMb = MIN_JM_MEMORY;
 		}
 
-		// ------------------ Add dynamic properties to local flinkConfiguraton ------
-		Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
-		for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
-			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
+		if (taskManagerMemoryMb < MIN_TM_MEMORY) {
+			LOG.warn("The minimum TaskManager memory is {}. Will set the Taskmanager memory to this value.", MIN_TM_MEMORY);
+			taskManagerMemoryMb = MIN_TM_MEMORY;
 		}
 
-		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
-
-		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
-		// all allocations below this value are automatically set to this value.
-		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
 		if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
 			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
 				+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
@@ -520,22 +440,15 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			taskManagerMemoryMb =  yarnMinAllocationMB;
 		}
 
-		// Create application via yarnClient
-		final YarnClientApplication yarnApplication = yarnClient.createApplication();
-		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
-		Resource maxRes = appResponse.getMaximumResourceCapability();
 		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
-		if (jobManagerMemoryMb > maxRes.getMemory()) {
-			failSessionDuringDeployment(yarnClient, yarnApplication);
+		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
 			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
-				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
+				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
 		}
 
-		if (taskManagerMemoryMb > maxRes.getMemory()) {
-			failSessionDuringDeployment(yarnClient, yarnApplication);
+		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
 			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
-				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
+				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
 		}
 
 		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
@@ -543,49 +456,107 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
 			"the resources become available.";
 		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
-		ClusterResourceDescription freeClusterMem;
-		try {
-			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
-		} catch (YarnException | IOException e) {
-			failSessionDuringDeployment(yarnClient, yarnApplication);
-			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
-		}
 
-		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
 			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
-				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+				+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
 
 		}
-		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
 			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
+				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
 		}
-		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
 			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
-				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
+				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
 		}
 
 		// ----------------- check if the requested containers fit into the cluster.
 
-		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
+		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
 		// first, allocate the jobManager somewhere.
 		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
 			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
 				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
-				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
+				Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
 		}
 		// allocate TaskManagers
 		for (int i = 0; i < taskManagerCount; i++) {
 			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
 				LOG.warn("There is not enough memory available in the YARN cluster. " +
 					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
-					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+					"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
 					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
 					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
 			}
 		}
 
-		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
+		return new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(jobManagerMemoryMb)
+			.setTaskManagerMemoryMB(taskManagerMemoryMb)
+			.setNumberTaskManagers(clusterSpecification.getNumberTaskManagers())
+			.setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
+			.createClusterSpecification();
+
+	}
+
+	/**
+	 * This method will block until the ApplicationMaster/JobManager have been
+	 * deployed on YARN.
+	 */
+	protected YarnClusterClient deployInternal(ClusterSpecification clusterSpecification) throws Exception {
+
+		isReadyForDeployment(clusterSpecification);
+
+		final YarnClient yarnClient = getYarnClient();
+
+		// ------------------ Check if the specified queue exists --------------------
+
+		checkYarnQueues(yarnClient);
+
+		// ------------------ Add dynamic properties to local flinkConfiguraton ------
+		Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
+		for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
+			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
+		}
+
+		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
+
+		// Create application via yarnClient
+		final YarnClientApplication yarnApplication = yarnClient.createApplication();
+		final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+
+		Resource maxRes = appResponse.getMaximumResourceCapability();
+
+		final ClusterResourceDescription freeClusterMem;
+		try {
+			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		} catch (YarnException | IOException e) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
+		}
+
+		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+
+		final ClusterSpecification validClusterSpecification;
+		try {
+			validClusterSpecification = validateClusterResources(
+				clusterSpecification,
+				yarnMinAllocationMB,
+				maxRes,
+				freeClusterMem);
+		} catch (YarnDeploymentException yde) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw yde;
+		}
+
+		LOG.info("Cluster specification: {}", validClusterSpecification);
+
+		ApplicationReport report = startAppMaster(
+			null,
+			yarnClient,
+			yarnApplication,
+			clusterSpecification);
 
 		String host = report.getHost();
 		int port = report.getRpcPort();
@@ -595,10 +566,51 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
 
 		// the Flink cluster is deployed in YARN. Represent cluster
-		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
+		return createYarnClusterClient(
+			this,
+			clusterSpecification.getNumberTaskManagers(),
+			clusterSpecification.getSlotsPerTaskManager(),
+			yarnClient,
+			report,
+			flinkConfiguration,
+			true);
+	}
+
+	private void checkYarnQueues(YarnClient yarnClient) {
+		try {
+			List<QueueInfo> queues = yarnClient.getAllQueues();
+			if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
+				boolean queueFound = false;
+				for (QueueInfo queue : queues) {
+					if (queue.getQueueName().equals(this.yarnQueue)) {
+						queueFound = true;
+						break;
+					}
+				}
+				if (!queueFound) {
+					String queueNames = "";
+					for (QueueInfo queue : queues) {
+						queueNames += queue.getQueueName() + ", ";
+					}
+					LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
+						"Available queues: " + queueNames);
+				}
+			} else {
+				LOG.debug("The YARN cluster does not have any queues configured");
+			}
+		} catch (Throwable e) {
+			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Error details", e);
+			}
+		}
 	}
 
-	public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
+	public ApplicationReport startAppMaster(
+			JobGraph jobGraph,
+			YarnClient yarnClient,
+			YarnClientApplication yarnApplication,
+			ClusterSpecification clusterSpecification) throws Exception {
 
 		// ------------------ Set default file system scheme -------------------------
 
@@ -802,7 +814,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
 		}
 
-		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
+		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
+			hasLogback,
+			hasLog4j,
+			hasKrb5,
+			clusterSpecification.getMasterMemoryMB());
 
 		if (UserGroupInformation.isSecurityEnabled()) {
 			// set HDFS delegation tokens when security is enabled
@@ -821,13 +837,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
 
 		// set Flink on YARN internal configuration values
-		appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
-		appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
+		appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(clusterSpecification.getNumberTaskManagers()));
+		appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
 		appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
 		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
-		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
+		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
 		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
 		appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
 		appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
@@ -858,12 +874,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		// Set up resource type requirements for ApplicationMaster
 		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(jobManagerMemoryMb);
+		capability.setMemory(clusterSpecification.getMasterMemoryMB());
 		capability.setVirtualCores(1);
 
 		String name;
 		if (customName == null) {
-			name = "Flink session with " + taskManagerCount + " TaskManagers";
+			name = "Flink session with " + clusterSpecification.getNumberTaskManagers() + " TaskManagers";
 			if (detached) {
 				name += " (detached)";
 			}
@@ -1288,9 +1304,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
-	protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback,
-																	boolean hasLog4j,
-																	boolean hasKrb5) {
+	protected ContainerLaunchContext setupApplicationMasterContainer(
+			boolean hasLogback,
+			boolean hasLog4j,
+			boolean hasKrb5,
+			int jobManagerMemoryMb) {
 		// ------------------ Prepare Application Master Container  ------------------------------
 
 		// respect custom JVM options in the YAML file
@@ -1365,12 +1383,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 */
 	protected YarnClusterClient createYarnClusterClient(
 			AbstractYarnClusterDescriptor descriptor,
+			int numberTaskManagers,
+			int slotsPerTaskManager,
 			YarnClient yarnClient,
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
 			boolean perJobCluster) throws Exception {
 		return new YarnClusterClient(
 			descriptor,
+			numberTaskManagers,
+			slotsPerTaskManager,
 			yarnClient,
 			report,
 			flinkConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index a435ef7..ceca29d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -78,6 +78,8 @@ public class YarnClusterClient extends ClusterClient {
 	//---------- Class internal fields -------------------
 
 	private final AbstractYarnClusterDescriptor clusterDescriptor;
+	private final int numberTaskManagers;
+	private final int slotsPerTaskManager;
 	private final LazApplicationClientLoader applicationClient;
 	private final FiniteDuration akkaDuration;
 	private final ApplicationReport appReport;
@@ -93,6 +95,8 @@ public class YarnClusterClient extends ClusterClient {
 	 * Create a new Flink on YARN cluster.
 	 *
 	 * @param clusterDescriptor The descriptor used at cluster creation
+	 * @param numberTaskManagers The number of task managers, -1 if unknown
+	 * @param slotsPerTaskManager Slots per task manager, -1 if unknown
 	 * @param yarnClient Client to talk to YARN
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
@@ -102,6 +106,8 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	public YarnClusterClient(
 		final AbstractYarnClusterDescriptor clusterDescriptor,
+		final int numberTaskManagers,
+		final int slotsPerTaskManager,
 		final YarnClient yarnClient,
 		final ApplicationReport appReport,
 		Configuration flinkConfig,
@@ -111,6 +117,8 @@ public class YarnClusterClient extends ClusterClient {
 
 		this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
 		this.clusterDescriptor = clusterDescriptor;
+		this.numberTaskManagers = numberTaskManagers;
+		this.slotsPerTaskManager = slotsPerTaskManager;
 		this.yarnClient = yarnClient;
 		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
@@ -186,7 +194,8 @@ public class YarnClusterClient extends ClusterClient {
 
 	@Override
 	public int getMaxSlots() {
-		int maxSlots = clusterDescriptor.getTaskManagerCount() * clusterDescriptor.getTaskManagerSlots();
+		// TODO: this should be retrieved from the running Flink cluster
+		int maxSlots = numberTaskManagers * slotsPerTaskManager;
 		return maxSlots > 0 ? maxSlots : -1;
 	}
 
@@ -506,8 +515,8 @@ public class YarnClusterClient extends ClusterClient {
 			currentStatus = getClusterStatus();
 			if (currentStatus != null && !currentStatus.equals(lastStatus)) {
 				logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
-					+ clusterDescriptor.getTaskManagerCount() + ")");
-				if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
+					+ numberTaskManagers + ")");
+				if (currentStatus.numRegisteredTaskManagers() >= numberTaskManagers) {
 					logAndSysout("All TaskManagers are connected");
 					break;
 				}
@@ -529,14 +538,14 @@ public class YarnClusterClient extends ClusterClient {
 
 	private static class LazApplicationClientLoader {
 
-		private final org.apache.flink.configuration.Configuration flinkConfig;
+		private final Configuration flinkConfig;
 		private final LazyActorSystemLoader actorSystemLoader;
 		private final HighAvailabilityServices highAvailabilityServices;
 
 		private ActorRef applicationClient;
 
 		private LazApplicationClientLoader(
-				org.apache.flink.configuration.Configuration flinkConfig,
+				Configuration flinkConfig,
 				LazyActorSystemLoader actorSystemLoader,
 				HighAvailabilityServices highAvailabilityServices) {
 			this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index f58e6aa..d1af6dc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -26,10 +26,7 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +76,7 @@ public class YarnClusterClientV2 extends ClusterClient {
 
 	@Override
 	public int getMaxSlots() {
-        // Now need not set max slot
+        // No need not set max slot
 		return 0;
 	}
 
@@ -90,25 +87,7 @@ public class YarnClusterClientV2 extends ClusterClient {
 
 	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		try {
-			// Create application via yarnClient
-			final YarnClientApplication yarnApplication = yarnClient.createApplication();
-			ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient, yarnApplication);
-			if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
-				appId = report.getApplicationId();
-				trackingURL = report.getTrackingUrl();
-				logAndSysout("Please refer to " + getWebInterfaceURL()
-						+ " for the running status of job " +  jobGraph.getJobID().toString());
-				//TODO: not support attach mode now
-				return new JobSubmissionResult(jobGraph.getJobID());
-			}
-			else {
-				throw new ProgramInvocationException("Fail to submit the job.");
-			}
-		}
-		catch (Exception e) {
-			throw new ProgramInvocationException("Fail to submit the job", e.getCause());
-		}
+		throw new UnsupportedOperationException("Not yet implemented.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index aaa9bac..af49fa8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -144,12 +144,6 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
 		}
 
-		// JobManager Memory
-		if (cmd.hasOption(jmMemory.getOpt())) {
-			int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
-			yarnClusterDescriptor.setJobManagerMemory(jmMemory);
-		}
-
 		String[] dynamicProperties = null;
 		if (cmd.hasOption(this.dynamicProperties.getOpt())) {
 			dynamicProperties = cmd.getOptionValues(this.dynamicProperties.getOpt());
@@ -232,7 +226,9 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
 		yarnClusterDescriptor.setFlinkConfiguration(config);
 		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
-		return new YarnClusterClientV2(yarnClusterDescriptor, config);
+		return new YarnClusterClientV2(
+			yarnClusterDescriptor,
+			config);
 	}
 
 	private void logAndSysout(String message) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 60223ef..a887e54 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -20,11 +20,14 @@ package org.apache.flink.yarn.cli;
 
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -258,13 +261,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
 
-		if (!cmd.hasOption(container.getOpt())) { // number of containers is required option!
-			LOG.error("Missing required argument {}", container.getOpt());
-			printUsage();
-			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
-		}
-		yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(container.getOpt())));
-
 		// Jar Path
 		Path localJarPath;
 		if (cmd.hasOption(flinkJar.getOpt())) {
@@ -309,23 +305,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
 		}
 
-		// JobManager Memory
-		if (cmd.hasOption(jmMemory.getOpt())) {
-			int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
-			yarnClusterDescriptor.setJobManagerMemory(jmMemory);
-		}
-
-		// Task Managers memory
-		if (cmd.hasOption(tmMemory.getOpt())) {
-			int tmMemory = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt()));
-			yarnClusterDescriptor.setTaskManagerMemory(tmMemory);
-		}
-
-		if (cmd.hasOption(slots.getOpt())) {
-			int slots = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt()));
-			yarnClusterDescriptor.setTaskManagerSlots(slots);
-		}
-
 		String[] dynamicProperties = null;
 		if (cmd.hasOption(dynamicproperties.getOpt())) {
 			dynamicProperties = cmd.getOptionValues(dynamicproperties.getOpt());
@@ -353,28 +332,60 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
 		}
 
-		// ----- Convenience -----
+		return yarnClusterDescriptor;
+	}
+
+	public ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
+		if (!cmd.hasOption(container.getOpt())) { // number of containers is required option!
+			LOG.error("Missing required argument {}", container.getOpt());
+			printUsage();
+			throw new IllegalArgumentException("Missing required argument " + container.getOpt());
+		}
+
+		int numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
 
-		// the number of slots available from YARN:
-		int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots();
-		if (yarnTmSlots == -1) {
-			yarnTmSlots = 1;
-			yarnClusterDescriptor.setTaskManagerSlots(yarnTmSlots);
+		// JobManager Memory
+		final int jobManagerMemoryMB;
+		if (cmd.hasOption(jmMemory.getOpt())) {
+			jobManagerMemoryMB = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
+		} else {
+			jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
 		}
 
-		int maxSlots = yarnTmSlots * yarnClusterDescriptor.getTaskManagerCount();
+		// Task Managers memory
+		final int taskManagerMemoryMB;
+		if (cmd.hasOption(tmMemory.getOpt())) {
+			taskManagerMemoryMB = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt()));
+		} else {
+			taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+		}
+
+		int slotsPerTaskManager;
+		if (cmd.hasOption(slots.getOpt())) {
+			slotsPerTaskManager = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt()));
+		} else {
+			slotsPerTaskManager = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		}
+
+		// convenience
 		int userParallelism = Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1"));
+		int maxSlots = slotsPerTaskManager * numberTaskManagers;
 		if (userParallelism != -1) {
-			int slotsPerTM = (int) Math.ceil((double) userParallelism / yarnClusterDescriptor.getTaskManagerCount());
+			int slotsPerTM = (int) Math.ceil((double) userParallelism / numberTaskManagers);
 			String message = "The YARN cluster has " + maxSlots + " slots available, " +
 				"but the user requested a parallelism of " + userParallelism + " on YARN. " +
-				"Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +
+				"Each of the " + numberTaskManagers + " TaskManagers " +
 				"will get " + slotsPerTM + " slots.";
 			logAndSysout(message);
-			yarnClusterDescriptor.setTaskManagerSlots(slotsPerTM);
+			slotsPerTaskManager = slotsPerTM;
 		}
 
-		return yarnClusterDescriptor;
+		return new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(jobManagerMemoryMB)
+			.setTaskManagerMemoryMB(taskManagerMemoryMB)
+			.setNumberTaskManagers(numberTaskManagers)
+			.setSlotsPerTaskManager(slotsPerTaskManager)
+			.createClusterSpecification();
 	}
 
 	private void printUsage() {
@@ -477,8 +488,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	public static void main(final String[] args) throws Exception {
-		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
 		Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
 		SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfiguration));
 		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
 			@Override
@@ -547,11 +558,12 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
 
 		AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
+		ClusterSpecification clusterSpecification = createClusterSpecification(config, cmdLine);
 		yarnClusterDescriptor.setFlinkConfiguration(config);
 		yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
 
 		try {
-			return yarnClusterDescriptor.deploySessionCluster();
+			return yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
 		} catch (Exception e) {
 			throw new RuntimeException("Error deploying the YARN cluster", e);
 		}
@@ -626,8 +638,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				return 1;
 			}
 
+			final ClusterSpecification clusterSpecification = createClusterSpecification(yarnDescriptor.getFlinkConfiguration(), cmd);
+
 			try {
-				yarnCluster = yarnDescriptor.deploySessionCluster();
+				yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);
 			} catch (Exception e) {
 				System.err.println("Error while deploying YARN cluster: " + e.getMessage());
 				e.printStackTrace(System.err);
@@ -646,9 +660,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 			Properties yarnProps = new Properties();
 			yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
-			if (yarnDescriptor.getTaskManagerSlots() != -1) {
+			if (clusterSpecification.getSlotsPerTaskManager() != -1) {
 				String parallelism =
-						Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());
+						Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers());
 				yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
 			}
 			// add dynamic properties

http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 14738e6..a862b50 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.hadoop.fs.Path;
@@ -40,7 +42,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the {@link YarnClusterDescriptor}.
  */
-public class YarnClusterDescriptorTest {
+public class YarnClusterDescriptorTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -65,11 +67,15 @@ public class YarnClusterDescriptorTest {
 		clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
 		clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
 
-		// configure slots too high
-		clusterDescriptor.setTaskManagerSlots(Integer.MAX_VALUE);
+		ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(-1)
+			.setTaskManagerMemoryMB(-1)
+			.setNumberTaskManagers(1)
+			.setSlotsPerTaskManager(Integer.MAX_VALUE)
+			.createClusterSpecification();
 
 		try {
-			clusterDescriptor.deploySessionCluster();
+			clusterDescriptor.deploySessionCluster(clusterSpecification);
 
 			fail("The deploy call should have failed.");
 		} catch (RuntimeException e) {
@@ -95,10 +101,15 @@ public class YarnClusterDescriptorTest {
 		clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
 
 		// configure slots
-		clusterDescriptor.setTaskManagerSlots(1);
+		ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+			.setMasterMemoryMB(-1)
+			.setTaskManagerMemoryMB(-1)
+			.setNumberTaskManagers(1)
+			.setSlotsPerTaskManager(1)
+			.createClusterSpecification();
 
 		try {
-			clusterDescriptor.deploySessionCluster();
+			clusterDescriptor.deploySessionCluster(clusterSpecification);
 
 			fail("The deploy call should have failed.");
 		} catch (RuntimeException e) {
@@ -132,6 +143,7 @@ public class YarnClusterDescriptorTest {
 		final String redirects =
 			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
 			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
+		final int jobManagerMemory = 1024;
 
 		// no logging, with/out krb5
 		assertEquals(
@@ -140,7 +152,11 @@ public class YarnClusterDescriptorTest {
 				" " + // logging
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(false, false, false)
+				.setupApplicationMasterContainer(
+					false,
+					false,
+					false,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		assertEquals(
@@ -149,7 +165,11 @@ public class YarnClusterDescriptorTest {
 				" " + // logging
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(false, false, true)
+				.setupApplicationMasterContainer(
+					false,
+					false,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		// logback only, with/out krb5
@@ -159,7 +179,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, false, false)
+				.setupApplicationMasterContainer(
+					true,
+					false,
+					false,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		assertEquals(
@@ -168,7 +192,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, false, true)
+				.setupApplicationMasterContainer(
+					true,
+					false,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		// log4j, with/out krb5
@@ -178,7 +206,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(false, true, false)
+				.setupApplicationMasterContainer(
+					false,
+					true,
+					false,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		assertEquals(
@@ -187,7 +219,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(false, true, true)
+				.setupApplicationMasterContainer(
+					false,
+					true,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5
@@ -197,7 +233,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, false)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					false,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		assertEquals(
@@ -206,7 +246,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, true)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5, different JVM opts
@@ -217,7 +261,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, false)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					false,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		assertEquals(
@@ -226,7 +274,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, true)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		// logback + log4j, with/out krb5, different JVM opts
@@ -237,7 +289,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, false)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					false,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		assertEquals(
@@ -246,7 +302,11 @@ public class YarnClusterDescriptorTest {
 				" " + logfile + " " + logback + " " + log4j +
 				" " + mainClass + " "  + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, true)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		// now try some configurations with different yarn.container-start-command-template
@@ -259,7 +319,11 @@ public class YarnClusterDescriptorTest {
 				" 3 " + logfile + " " + logback + " " + log4j +
 				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, true)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 
 		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
@@ -271,7 +335,11 @@ public class YarnClusterDescriptorTest {
 				" " + jvmmem +
 				" " + mainClass + " " + args + " " + redirects,
 			clusterDescriptor
-				.setupApplicationMasterContainer(true, true, true)
+				.setupApplicationMasterContainer(
+					true,
+					true,
+					true,
+					jobManagerMemory)
 				.getCommands().get(0));
 	}
 }