You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/11 16:15:01 UTC

[2/5] flink git commit: [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the client and gives
a better separation of concerns.

This closes #5216.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/156b8935
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/156b8935
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/156b8935

Branch: refs/heads/master
Commit: 156b8935ef76eb53456cea1d40fd528ccefa21d8
Parents: 2ce5b98
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 20 16:43:21 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 16:59:29 2018 +0100

----------------------------------------------------------------------
 .../client/deployment/ClusterDescriptor.java    |   2 +-
 .../Flip6StandaloneClusterDescriptor.java       |   5 +
 .../deployment/StandaloneClusterDescriptor.java |   5 +
 ...CliFrontendYarnAddressConfigurationTest.java |  80 ++--
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   6 +-
 .../yarn/TestingYarnClusterDescriptor.java      |   7 +-
 .../java/org/apache/flink/yarn/YARNITCase.java  |  47 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  81 ++--
 .../flink/yarn/YarnClusterDescriptorTest.java   |  94 ++--
 .../yarn/AbstractYarnClusterDescriptor.java     |  87 ++--
 .../apache/flink/yarn/YarnClusterClient.java    |  14 +-
 .../flink/yarn/YarnClusterDescriptor.java       |   9 +-
 .../flink/yarn/YarnClusterDescriptorV2.java     |   9 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   | 451 ++++++++++---------
 15 files changed, 483 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index a62ceff..1603930 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 /**
  * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
  */
-public interface ClusterDescriptor<ClientType extends ClusterClient> {
+public interface ClusterDescriptor<ClientType extends ClusterClient> extends AutoCloseable {
 
 	/**
 	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...).

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index 9d88f59..b8eb534 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -60,4 +60,9 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<RestC
 	public RestClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Can't deploy a standalone FLIP-6 per-job cluster.");
 	}
+
+	@Override
+	public void close() throws Exception {
+		// nothing to do
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 51e267a..3808efa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -59,4 +59,9 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	public StandaloneClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
 	}
+
+	@Override
+	public void close() throws Exception {
+		// nothing to do
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 1b457a5..56087a1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -38,7 +38,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.AfterClass;
@@ -375,12 +374,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 			private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
 
 				public TestingYarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
-					super(flinkConfiguration, configurationDirectory);
-				}
-
-				@Override
-				public YarnClient getYarnClient() {
-					return new TestYarnClient();
+					super(
+						flinkConfiguration,
+						configurationDirectory,
+						new TestYarnClient(finalApplicationStatus));
 				}
 
 				@Override
@@ -388,52 +385,51 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 						AbstractYarnClusterDescriptor descriptor,
 						int numberTaskManagers,
 						int slotsPerTaskManager,
-						YarnClient yarnClient,
 						ApplicationReport report,
 						Configuration flinkConfiguration,
 						boolean perJobCluster) throws IOException, YarnException {
 
 					return Mockito.mock(YarnClusterClient.class);
 				}
+			}
+		}
 
-				private class TestYarnClient extends YarnClientImpl {
-
-					private final List<ApplicationReport> reports = new LinkedList<>();
-
-					TestYarnClient() {
-						{   // a report that of our Yarn application we want to resume from
-							ApplicationReport report = Mockito.mock(ApplicationReport.class);
-							Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
-							Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
-							Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
-							Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
-							this.reports.add(report);
-						}
-						{   // a second report, just for noise
-							ApplicationReport report = Mockito.mock(ApplicationReport.class);
-							Mockito.when(report.getHost()).thenReturn("1.2.3.4");
-							Mockito.when(report.getRpcPort()).thenReturn(-123);
-							Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
-							Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
-							this.reports.add(report);
-						}
-					}
+		private static class TestYarnClient extends YarnClientImpl {
 
-					@Override
-					public List<ApplicationReport> getApplications() throws YarnException, IOException {
-						return reports;
-					}
+			private final List<ApplicationReport> reports = new LinkedList<>();
 
-					@Override
-					public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
-						for (ApplicationReport report : reports) {
-							if (report.getApplicationId().equals(appId)) {
-								return report;
-							}
-						}
-						throw new YarnException();
+			TestYarnClient(FinalApplicationStatus finalApplicationStatus) {
+				{   // a report that of our Yarn application we want to resume from
+					ApplicationReport report = Mockito.mock(ApplicationReport.class);
+					Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
+					Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
+					Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
+					Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+					this.reports.add(report);
+				}
+				{   // a second report, just for noise
+					ApplicationReport report = Mockito.mock(ApplicationReport.class);
+					Mockito.when(report.getHost()).thenReturn("1.2.3.4");
+					Mockito.when(report.getRpcPort()).thenReturn(-123);
+					Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
+					Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+					this.reports.add(report);
+				}
+			}
+
+			@Override
+			public List<ApplicationReport> getApplications() throws YarnException, IOException {
+				return reports;
+			}
+
+			@Override
+			public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
+				for (ApplicationReport report : reports) {
+					if (report.getApplicationId().equals(appId)) {
+						return report;
 					}
 				}
+				throw new YarnException();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 8eef8f0..3fe8d2f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -174,7 +174,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
 			public JarAgnosticClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
-				super(flinkConfiguration, configurationDirectory);
+				super(
+					flinkConfiguration,
+					configurationDirectory,
+					YarnClient.createYarnClient());
 			}
 
 			@Override
@@ -202,7 +205,6 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			super(descriptor,
 				numberTaskManagers,
 				slotsPerTaskManager,
-				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
 				false);

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 30d2798..e66d2e0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.util.ArrayList;
@@ -36,7 +38,10 @@ import java.util.List;
 public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
 	public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
-		super(configuration, configurationDirectory);
+		super(
+			configuration,
+			configurationDirectory,
+			YarnClient.createYarnClient());
 		List<File> filesToShip = new ArrayList<>();
 
 		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index bc28c5b..069f68a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -49,35 +50,43 @@ public class YARNITCase extends YarnTestBase {
 
 	@Ignore("The cluster cannot be stopped yet.")
 	@Test
-	public void testPerJobMode() {
+	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-		YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(configuration, System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR));
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 
-		yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+		try (final YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(
+			configuration,
+			System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
+			yarnClient)) {
 
-		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(768)
-			.setTaskManagerMemoryMB(1024)
-			.setSlotsPerTaskManager(1)
-			.setNumberTaskManagers(1)
-			.createClusterSpecification();
+			yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+			yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
+			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(768)
+				.setTaskManagerMemoryMB(1024)
+				.setSlotsPerTaskManager(1)
+				.setNumberTaskManagers(1)
+				.createClusterSpecification();
 
-		env.addSource(new InfiniteSource())
-			.shuffle()
-			.addSink(new DiscardingSink<Integer>());
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(2);
 
-		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			env.addSource(new InfiniteSource())
+				.shuffle()
+				.addSink(new DiscardingSink<Integer>());
 
-		File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+			final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+			File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
 
-		YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+
+			YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+
+			clusterClient.shutdown();
+		}
 	}
 
 	private static class InfiniteSource implements ParallelSourceFunction<Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index dd56f2f..ec8ef50 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -227,51 +227,56 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		Configuration configuration = GlobalConfiguration.loadConfiguration();
-		AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(configuration, confDirPath);
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 
-		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(768)
-			.setTaskManagerMemoryMB(1024)
-			.setNumberTaskManagers(1)
-			.setSlotsPerTaskManager(1)
-			.createClusterSpecification();
+		try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+			configuration,
+			confDirPath,
+			yarnClient)) {
+			Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
+			clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+			clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
-		// deploy
-		ClusterClient yarnCluster = null;
-		try {
-			yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
-		} catch (Exception e) {
-			LOG.warn("Failing test", e);
-			Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
-		}
-		GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
-		for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(768)
+				.setTaskManagerMemoryMB(1024)
+				.setNumberTaskManagers(1)
+				.setSlotsPerTaskManager(1)
+				.createClusterSpecification();
+			// deploy
+			ClusterClient yarnCluster = null;
 			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException e) {
-				LOG.warn("Interrupted", e);
-			}
-			GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-			if (status != null && status.equals(expectedStatus)) {
-				LOG.info("ClusterClient reached status " + status);
-				break; // all good, cluster started
+				yarnCluster = clusterDescriptor.deploySessionCluster(clusterSpecification);
+			} catch (Exception e) {
+				LOG.warn("Failing test", e);
+				Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
 			}
-			if (second > waitTime) {
-				// we waited for 15 seconds. cluster didn't come up correctly
-				Assert.fail("The custer didn't start after " + waitTime + " seconds");
+			GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
+			for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted", e);
+				}
+				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
+				if (status != null && status.equals(expectedStatus)) {
+					LOG.info("ClusterClient reached status " + status);
+					break; // all good, cluster started
+				}
+				if (second > waitTime) {
+					// we waited for 15 seconds. cluster didn't come up correctly
+					Assert.fail("The custer didn't start after " + waitTime + " seconds");
+				}
 			}
-		}
 
-		// use the cluster
-		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+			// use the cluster
+			Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+			Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
 
-		LOG.info("Shutting down cluster. All tests passed");
-		// shutdown cluster
-		yarnCluster.shutdown();
+			LOG.info("Shutting down cluster. All tests passed");
+			// shutdown cluster
+			yarnCluster.shutdown();
+		}
 		LOG.info("Finished testJavaAPI()");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index f3e48c5..5144550 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -50,32 +51,40 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testExplicitLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
-		descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			temporaryFolder.getRoot().getAbsolutePath(),
+			YarnClient.createYarnClient());
 
-		File libFile = temporaryFolder.newFile("libFile.jar");
-		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+		try {
+			descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+
+			File libFile = temporaryFolder.newFile("libFile.jar");
+			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
 
-		Assert.assertFalse(descriptor.shipFiles.contains(libFile));
-		Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
 
-		List<File> shipFiles = new ArrayList<>();
-		shipFiles.add(libFile);
-		shipFiles.add(libFolder);
+			List<File> shipFiles = new ArrayList<>();
+			shipFiles.add(libFile);
+			shipFiles.add(libFolder);
 
-		descriptor.addShipFiles(shipFiles);
+			descriptor.addShipFiles(shipFiles);
 
-		Assert.assertTrue(descriptor.shipFiles.contains(libFile));
-		Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+			Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+			Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
 
-		// only execute part of the deployment to test for shipped files
-		Set<File> effectiveShipFiles = new HashSet<>();
-		descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+			// only execute part of the deployment to test for shipped files
+			Set<File> effectiveShipFiles = new HashSet<>();
+			descriptor.addLibFolderToShipFiles(effectiveShipFiles);
 
-		Assert.assertEquals(0, effectiveShipFiles.size());
-		Assert.assertEquals(2, descriptor.shipFiles.size());
-		Assert.assertTrue(descriptor.shipFiles.contains(libFile));
-		Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+			Assert.assertEquals(0, effectiveShipFiles.size());
+			Assert.assertEquals(2, descriptor.shipFiles.size());
+			Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+			Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+		} finally {
+			descriptor.close();
+		}
 	}
 
 	/**
@@ -83,30 +92,37 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testEnvironmentLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
+		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			temporaryFolder.getRoot().getAbsolutePath(),
+			YarnClient.createYarnClient());
 
-		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
-		File libFile = new File(libFolder, "libFile.jar");
-		libFile.createNewFile();
-
-		Set<File> effectiveShipFiles = new HashSet<>();
-
-		final Map<String, String> oldEnv = System.getenv();
 		try {
-			Map<String, String> env = new HashMap<>(1);
-			env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
-			TestBaseUtils.setEnv(env);
-			// only execute part of the deployment to test for shipped files
-			descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+			File libFile = new File(libFolder, "libFile.jar");
+			libFile.createNewFile();
+
+			Set<File> effectiveShipFiles = new HashSet<>();
+
+			final Map<String, String> oldEnv = System.getenv();
+			try {
+				Map<String, String> env = new HashMap<>(1);
+				env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+				TestBaseUtils.setEnv(env);
+				// only execute part of the deployment to test for shipped files
+				descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+			} finally {
+				TestBaseUtils.setEnv(oldEnv);
+			}
+
+			// only add the ship the folder, not the contents
+			Assert.assertFalse(effectiveShipFiles.contains(libFile));
+			Assert.assertTrue(effectiveShipFiles.contains(libFolder));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
 		} finally {
-			TestBaseUtils.setEnv(oldEnv);
+			descriptor.close();
 		}
-
-		// only add the ship the folder, not the contents
-		Assert.assertFalse(effectiveShipFiles.contains(libFile));
-		Assert.assertTrue(effectiveShipFiles.contains(libFolder));
-		Assert.assertFalse(descriptor.shipFiles.contains(libFile));
-		Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 86ddd9b..0372319 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -108,14 +108,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
 	private static final int MIN_TM_MEMORY = 768;
 
-	private Configuration conf = new YarnConfiguration();
+	private final YarnConfiguration yarnConfiguration;
 
-	/**
-	 * If the user has specified a different number of slots, we store them here
-	 * Files (usually in a distributed file system) used for the YARN session of Flink.
-	 * Contains configuration files and jar files.
-	 */
-	private Path sessionFilesDir;
+	private final YarnClient yarnClient;
 
 	private String yarnQueue;
 
@@ -128,7 +123,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	/** Lazily initialized list of files to ship. */
 	protected List<File> shipFiles = new LinkedList<>();
 
-	private final org.apache.flink.configuration.Configuration flinkConfiguration;
+	private final Configuration flinkConfiguration;
 
 	private boolean detached;
 
@@ -143,31 +138,48 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
 	public AbstractYarnClusterDescriptor(
-		org.apache.flink.configuration.Configuration flinkConfiguration,
-		String configurationDirectory) {
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient) {
+
+		yarnConfiguration = new YarnConfiguration();
+
 		// for unit tests only
 		if (System.getenv("IN_TESTS") != null) {
 			try {
-				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+				yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml").toURI().toURL());
 			} catch (Throwable t) {
 				throw new RuntimeException("Error", t);
 			}
 		}
 
+		this.yarnClient = Preconditions.checkNotNull(yarnClient);
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+
 		this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
 		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 	}
 
+	public YarnClient getYarnClient() {
+		return yarnClient;
+	}
+
 	/**
-	 * The class to bootstrap the application master of the Yarn cluster (runs main method).
+	 * The class to start the application master with. This class runs the main
+	 * method in case of session cluster.
 	 */
 	protected abstract String getYarnSessionClusterEntrypoint();
 
+	/**
+	 * The class to start the application master with. This class runs the main
+	 * method in case of the job cluster.
+	 */
 	protected abstract String getYarnJobClusterEntrypoint();
 
-	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+	public Configuration getFlinkConfiguration() {
 		return flinkConfiguration;
 	}
 
@@ -257,7 +269,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// Check if we don't exceed YARN's maximum virtual cores.
 		// The number of cores can be configured in the config.
 		// If not configured, it is set to the number of task slots
-		int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+		int numYarnVcores = yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
 		int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
 		// don't configure more than the maximum configured number of vcores
 		if (configuredVcores > numYarnVcores) {
@@ -304,21 +316,22 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.zookeeperNamespace = zookeeperNamespace;
 	}
 
-	/**
-	 * Gets a Hadoop Yarn client.
-	 * @return Returns a YarnClient which has to be shutdown manually
-	 */
-	public YarnClient getYarnClient() {
-		YarnClient yarnClient = YarnClient.createYarnClient();
-		yarnClient.init(conf);
-		yarnClient.start();
-		return yarnClient;
+	// -------------------------------------------------------------
+	// Lifecycle management
+	// -------------------------------------------------------------
+
+	@Override
+	public void close() {
+		yarnClient.stop();
 	}
 
+	// -------------------------------------------------------------
+	// ClusterClient overrides
+	// -------------------------------------------------------------
+
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 
-		YarnClient yarnClient = null;
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
 			if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -329,7 +342,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 
 			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
-			yarnClient = getYarnClient();
 			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
 
 			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -349,14 +361,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				this,
 				-1, // we don't know the number of task managers of a started Flink cluster
 				-1, // we don't know how many slots each task manager has for a started Flink cluster
-				yarnClient,
 				appReport,
 				flinkConfiguration,
 				false);
 		} catch (Exception e) {
-			if (null != yarnClient) {
-				yarnClient.stop();
-			}
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
 	}
@@ -414,8 +422,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		isReadyForDeployment(clusterSpecification);
 
-		final YarnClient yarnClient = getYarnClient();
-
 		// ------------------ Check if the specified queue exists --------------------
 
 		checkYarnQueues(yarnClient);
@@ -442,7 +448,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
 		}
 
-		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+		final int yarnMinAllocationMB = yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", 0);
 
 		final ClusterSpecification validClusterSpecification;
 		try {
@@ -477,7 +483,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			this,
 			clusterSpecification.getNumberTaskManagers(),
 			clusterSpecification.getSlotsPerTaskManager(),
-			yarnClient,
 			report,
 			flinkConfiguration,
 			true);
@@ -627,7 +632,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// initialize file system
 		// Copy the application master jar to the filesystem
 		// Create a local resource to point to the destination jar path
-		final FileSystem fs = FileSystem.get(conf);
+		final FileSystem fs = FileSystem.get(yarnConfiguration);
 		final Path homeDir = fs.getHomeDirectory();
 
 		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
@@ -881,7 +886,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		if (UserGroupInformation.isSecurityEnabled()) {
 			// set HDFS delegation tokens when security is enabled
 			LOG.info("Adding delegation token to the AM container..");
-			Utils.setTokensFor(amContainer, paths, conf);
+			Utils.setTokensFor(amContainer, paths, yarnConfiguration);
 		}
 
 		amContainer.setLocalResources(localResources);
@@ -926,7 +931,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		// set classpath from YARN configuration
-		Utils.setupYarnClassPath(conf, appMasterEnv);
+		Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
 
 		amContainer.setEnvironment(appMasterEnv);
 
@@ -1196,7 +1201,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
 			PrintStream ps = new PrintStream(baos);
 
-			YarnClient yarnClient = getYarnClient();
 			YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
 
 			ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
@@ -1223,7 +1227,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
 					q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
 			}
-			yarnClient.stop();
 			return baos.toString();
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't get cluster description", e);
@@ -1411,7 +1414,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			failSessionDuringDeployment(yarnClient, yarnApplication);
 			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
-				FileSystem fs = FileSystem.get(conf);
+				FileSystem fs = FileSystem.get(yarnConfiguration);
 
 				if (!fs.delete(yarnFilesDir, true)) {
 					throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
@@ -1419,7 +1422,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 				fs.close();
 			} catch (IOException e) {
-				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
+				LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
 			}
 		}
 	}
@@ -1525,7 +1528,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			AbstractYarnClusterDescriptor descriptor,
 			int numberTaskManagers,
 			int slotsPerTaskManager,
-			YarnClient yarnClient,
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
 			boolean perJobCluster) throws Exception {
@@ -1533,7 +1535,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			descriptor,
 			numberTaskManagers,
 			slotsPerTaskManager,
-			yarnClient,
 			report,
 			flinkConfiguration,
 			perJobCluster);

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 80d0943..63421f9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -43,7 +43,6 @@ import akka.util.Timeout;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,8 +66,6 @@ public class YarnClusterClient extends ClusterClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
 
-	private YarnClient yarnClient;
-
 	private Thread clientShutdownHook = new ClientShutdownHook();
 
 	//---------- Class internal fields -------------------
@@ -93,7 +90,6 @@ public class YarnClusterClient extends ClusterClient {
 	 * @param clusterDescriptor The descriptor used at cluster creation
 	 * @param numberTaskManagers The number of task managers, -1 if unknown
 	 * @param slotsPerTaskManager Slots per task manager, -1 if unknown
-	 * @param yarnClient Client to talk to YARN
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
 	 * @param newlyCreatedCluster Indicator whether this cluster has just been created
@@ -104,7 +100,6 @@ public class YarnClusterClient extends ClusterClient {
 		final AbstractYarnClusterDescriptor clusterDescriptor,
 		final int numberTaskManagers,
 		final int slotsPerTaskManager,
-		final YarnClient yarnClient,
 		final ApplicationReport appReport,
 		Configuration flinkConfig,
 		boolean newlyCreatedCluster) throws Exception {
@@ -115,7 +110,6 @@ public class YarnClusterClient extends ClusterClient {
 		this.clusterDescriptor = clusterDescriptor;
 		this.numberTaskManagers = numberTaskManagers;
 		this.slotsPerTaskManager = slotsPerTaskManager;
-		this.yarnClient = yarnClient;
 		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
@@ -328,7 +322,7 @@ public class YarnClusterClient extends ClusterClient {
 			Future<Object> response =
 				Patterns.ask(applicationClient.get(),
 					new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
-							"Flink YARN Client requested shutdown"),
+						"Flink YARN Client requested shutdown"),
 					new Timeout(akkaDuration));
 			Await.ready(response, akkaDuration);
 		} catch (Exception e) {
@@ -349,7 +343,7 @@ public class YarnClusterClient extends ClusterClient {
 		}
 
 		try {
-			ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+			ApplicationReport appReport = clusterDescriptor.getYarnClient().getApplicationReport(appId);
 
 			LOG.info("Application " + appId + " finished with state " + appReport
 				.getYarnApplicationState() + " and final state " + appReport
@@ -368,10 +362,6 @@ public class YarnClusterClient extends ClusterClient {
 		} catch (Exception e) {
 			LOG.warn("Couldn't get final report", e);
 		}
-
-		LOG.info("YARN Client is shutting down");
-		yarnClient.stop(); // actorRunner is using the yarnClient.
-		yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.
 	}
 
 	public boolean hasBeenShutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8759c3e..76f9154 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -22,13 +22,18 @@ import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
-	public YarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
-		super(flinkConfiguration, configurationDirectory);
+	public YarnClusterDescriptor(
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient) {
+		super(flinkConfiguration, configurationDirectory, yarnClient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index ed04523..6ce192c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -22,14 +22,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
 /**
  * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
  * new application master for a job under flip-6.
  */
 public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
 
-	public YarnClusterDescriptorV2(Configuration flinkConfiguration, String configurationDirectory) {
-		super(flinkConfiguration, configurationDirectory);
+	public YarnClusterDescriptorV2(
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnCLient) {
+		super(flinkConfiguration, configurationDirectory, yarnCLient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5483758..c045082 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -50,6 +50,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -862,10 +863,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 		if (flip6) {
-			return new YarnClusterDescriptorV2(configuration, configurationDirectory);
+			return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);
 		} else {
-			return new YarnClusterDescriptor(configuration, configurationDirectory);
+			return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index c11c413..0d1bf65 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,9 +60,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	@Test
 	public void testFailIfTaskSlotsHigherThanMaxVcores() {
 
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			new Configuration(),
-			temporaryFolder.getRoot().getAbsolutePath());
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient);
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
@@ -81,6 +85,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			if (!(e.getCause() instanceof IllegalConfigurationException)) {
 				throw e;
 			}
+		} finally {
+			clusterDescriptor.close();
 		}
 	}
 
@@ -90,9 +96,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		// overwrite vcores in config
 		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
 
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
-			temporaryFolder.getRoot().getAbsolutePath());
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient);
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
@@ -113,15 +122,19 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			if (!(e.getCause() instanceof IllegalConfigurationException)) {
 				throw e;
 			}
+		} finally {
+			clusterDescriptor.close();
 		}
 	}
 
 	@Test
 	public void testSetupApplicationMasterContainer() {
 		Configuration cfg = new Configuration();
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			cfg,
-			temporaryFolder.getRoot().getAbsolutePath());
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient);
 
 		final String java = "$JAVA_HOME/bin/java";
 		final String jvmmem = "-Xmx424m";
@@ -142,219 +155,223 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
 		final int jobManagerMemory = 1024;
 
-		// no logging, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + // logging
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					false,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + // logging
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					false,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback only, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + logfile + " " + logback +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					false,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					false,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// log4j, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + logfile + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + logfile + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback + log4j, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback + log4j, with/out krb5, different JVM opts
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
-		// because we have a reference to the ClusterDescriptor's configuration which we modify continuously
-		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts +
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback + log4j, with/out krb5, different JVM opts
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
-		cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts + " " + jmJvmOpts +
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// now try some configurations with different yarn.container-start-command-template
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
-		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
-		assertEquals(
-			java + " 1 " + jvmmem +
-				" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
-				" 3 " + logfile + " " + logback + " " + log4j +
-				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-			"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
-		assertEquals(
-			java +
-				" " + logfile + " " + logback + " " + log4j +
-				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
-				" " + jvmmem +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
+		try {
+			// no logging, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + // logging
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						false,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + // logging
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						false,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback only, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + logfile + " " + logback +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						false,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						false,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// log4j, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + logfile + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + logfile + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback + log4j, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback + log4j, with/out krb5, different JVM opts
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
+			// because we have a reference to the ClusterDescriptor's configuration which we modify continuously
+			cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts +
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback + log4j, with/out krb5, different JVM opts
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+			cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts + " " + jmJvmOpts +
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// now try some configurations with different yarn.container-start-command-template
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+			cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+				"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
+			assertEquals(
+				java + " 1 " + jvmmem +
+					" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+					" 3 " + logfile + " " + logback + " " + log4j +
+					" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+				"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+			assertEquals(
+				java +
+					" " + logfile + " " + logback + " " + log4j +
+					" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+					" " + jvmmem +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+		} finally {
+			clusterDescriptor.close();
+		}
 	}
 }